Reduce Unit Test Times (Part 3) (#3850)

* add coverage report

* define env vars in shared action

* reduce time for longest running tests

* fix broken shared action

* reduce test time

* reducing Pipeline test times

* further reducing test times

* rework Z3 test

* testing new mp.pool and persistent dist envs

* fix import

* reuse distributed environment for tests with lots of param combos

* fix for dist teardown

* fix pickling issue with pool cache

* actually fix pickling problem

* avoid running pool cache stuff on non-distributed tests

* fix issues with nested mp.pool

* fix for nested pools in Pipeline Engine

* re-add params

* update workflows with pytest opts

* implement feedback

* resolve race condition with port selection

* Update tests/unit/common.py

---------

Co-authored-by: Olatunji Ruwase <olruwase@microsoft.com>
This commit is contained in:
Michael Wyatt
2023-07-11 17:35:49 -07:00
committed by GitHub
parent e59f69a8ff
commit aef6c65ce3
35 changed files with 456 additions and 408 deletions

View File

@ -23,7 +23,7 @@ jobs:
- name: Install pytorch
run: |
pip install torch==1.13.1 torchvision --extra-index-url https://download.pytorch.org/whl/rocm5.1.1
pip install --cache-dir $TORCH_CACHE torch==1.13.1 torchvision --extra-index-url https://download.pytorch.org/whl/rocm5.1.1
python -c "import torch; print('torch:', torch.__version__, torch)"
python -c "import torch; print('CUDA available:', torch.cuda.is_available())"
@ -50,7 +50,7 @@ jobs:
# Runs a set of commands using the runners shell
- name: Unit tests
run: |
if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
cd tests
TORCH_EXTENSIONS_DIR=./torch-extensions pytest -n 4 --verbose unit/
TORCH_EXTENSIONS_DIR=./torch-extensions pytest -m 'sequential' unit/
pytest $PYTEST_OPTS -n 4 --verbose unit/
pytest $PYTEST_OPTS -m 'sequential' unit/

View File

@ -23,7 +23,7 @@ jobs:
- name: Install pytorch
run: |
pip install -U --cache-dir /blob/torch_cache torch torchvision --extra-index-url https://download.pytorch.org/whl/rocm5.4.2
pip install -U --cache-dir $TORCH_CACHE torch torchvision --extra-index-url https://download.pytorch.org/whl/rocm5.4.2
python -c "import torch; print('torch:', torch.__version__, torch)"
python -c "import torch; print('CUDA available:', torch.cuda.is_available())"
@ -58,7 +58,7 @@ jobs:
# Runs a set of commands using the runners shell
- name: Unit tests
run: |
if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
cd tests
TORCH_EXTENSIONS_DIR=./torch-extensions pytest -n 4 --verbose unit/
TORCH_EXTENSIONS_DIR=./torch-extensions pytest -m 'sequential' unit/
pytest $PYTEST_OPTS -n 4 --verbose unit/
pytest $PYTEST_OPTS -m 'sequential' unit/

View File

@ -75,7 +75,6 @@ jobs:
run: |
source oneCCL/build/_install/env/setvars.sh
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi
cd tests
TRANSFORMERS_CACHE=~/tmp/transformers_cache/ TORCH_EXTENSIONS_DIR=./torch-extensions pytest -m 'inference' unit/inference/test_inference_config.py
TRANSFORMERS_CACHE=~/tmp/transformers_cache/ TORCH_EXTENSIONS_DIR=./torch-extensions pytest -k TestDistAllReduce unit/comm/test_dist.py
TRANSFORMERS_CACHE=~/tmp/transformers_cache/ pytest $PYTEST_OPTS -m 'inference' unit/inference/test_inference_config.py
TRANSFORMERS_CACHE=~/tmp/transformers_cache/ pytest $PYTEST_OPTS -k TestDistAllReduce unit/comm/test_dist.py

View File

@ -26,7 +26,7 @@ jobs:
- name: Install pytorch
run: |
pip install -U --cache-dir /blob/torch_cache torch torchvision --extra-index-url https://download.pytorch.org/whl/cu111
pip install -U --cache-dir $TORCH_CACHE torch torchvision --extra-index-url https://download.pytorch.org/whl/cu111
python -c "import torch; print('torch:', torch.__version__, torch)"
python -c "import torch; print('CUDA available:', torch.cuda.is_available())"
@ -41,7 +41,7 @@ jobs:
- name: HF Accelerate tests
run: |
if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
git clone https://github.com/huggingface/accelerate
cd accelerate
git rev-parse --short HEAD
@ -52,4 +52,4 @@ jobs:
# tmp fix: force newer datasets version
#pip install "datasets>=2.0.0"
pip list
HF_DATASETS_CACHE=/blob/datasets_cache/ TRANSFORMERS_CACHE=/blob/transformers_cache/ TORCH_EXTENSIONS_DIR=./torch-extensions pytest --color=yes --durations=0 --verbose tests/deepspeed
pytest $PYTEST_OPTS --color=yes --durations=0 --verbose tests/deepspeed

View File

@ -46,7 +46,6 @@ jobs:
- name: Unit tests
run: |
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi
cd tests
TORCH_EXTENSIONS_DIR=./torch-extensions python -m pytest -n 4 unit/ --torch_ver="2.0" --cuda_ver="12"
TORCH_EXTENSIONS_DIR=./torch-extensions python -m pytest -m 'sequential' unit/ --torch_ver="2.0" --cuda_ver="12"
python -m pytest $PYTEST_OPTS -n 4 unit/ --torch_ver="2.0" --cuda_ver="12"
python -m pytest $PYTEST_OPTS -m 'sequential' unit/ --torch_ver="2.0" --cuda_ver="12"

View File

@ -26,7 +26,7 @@ jobs:
- name: Install pytorch
run: |
pip install -U --cache-dir /blob/torch_cache torch==1.13.1 torchvision --extra-index-url https://download.pytorch.org/whl/cu116
pip install -U --cache-dir $TORCH_CACHE torch==1.13.1 torchvision --extra-index-url https://download.pytorch.org/whl/cu116
python -c "import torch; print('torch:', torch.__version__, torch)"
python -c "import torch; print('CUDA available:', torch.cuda.is_available())"
@ -49,8 +49,13 @@ jobs:
- name: Unit tests
run: |
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi
cd tests
TRANSFORMERS_CACHE=/blob/transformers_cache/ TORCH_EXTENSIONS_DIR=./torch-extensions pytest -m 'seq_inference' unit/ --torch_ver="1.13" --cuda_ver="11.6"
TRANSFORMERS_CACHE=/blob/transformers_cache/ TORCH_EXTENSIONS_DIR=./torch-extensions pytest -m 'inference_ops' unit/ --torch_ver="1.13" --cuda_ver="11.6"
TRANSFORMERS_CACHE=/blob/transformers_cache/ TORCH_EXTENSIONS_DIR=./torch-extensions pytest --forked -n 4 -m 'inference' unit/ --torch_ver="1.13" --cuda_ver="11.6"
coverage run --concurrency=multiprocessing -m pytest $PYTEST_OPTS -m 'seq_inference' unit/ --torch_ver="1.13" --cuda_ver="11.6"
coverage run --concurrency=multiprocessing -m pytest $PYTEST_OPTS -m 'inference_ops' unit/ --torch_ver="1.13" --cuda_ver="11.6"
coverage run --concurrency=multiprocessing -m pytest $PYTEST_OPTS --forked -n 4 -m 'inference' unit/ --torch_ver="1.13" --cuda_ver="11.6"
- name: Coverage report
run: |
cd tests
coverage combine
coverage report -m

View File

@ -26,7 +26,7 @@ jobs:
- name: Install pytorch
run: |
pip install -U --cache-dir /blob/torch_cache torch torchvision --extra-index-url https://download.pytorch.org/whl/cu116
pip install -U --cache-dir $TORCH_CACHE torch torchvision --extra-index-url https://download.pytorch.org/whl/cu116
python -c "import torch; print('torch:', torch.__version__, torch)"
python -c "import torch; print('CUDA available:', torch.cuda.is_available())"
@ -41,8 +41,8 @@ jobs:
- name: PyTorch Lightning Tests
run: |
if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
pip install pytorch-lightning
pip install "protobuf<4.21.0"
cd tests
TORCH_EXTENSIONS_DIR=./torch-extensions pytest --color=yes --durations=0 --verbose lightning/
pytest $PYTEST_OPTS lightning/

View File

@ -26,7 +26,7 @@ jobs:
- name: Install pytorch
run: |
pip install -U --cache-dir /blob/torch_cache torch==1.13.1 torchvision --extra-index-url https://download.pytorch.org/whl/cu116
pip install -U --cache-dir $TORCH_CACHE torch==1.13.1 torchvision --extra-index-url https://download.pytorch.org/whl/cu116
python -c "import torch; print('torch:', torch.__version__, torch)"
python -c "import torch; print('CUDA available:', torch.cuda.is_available())"
@ -57,6 +57,5 @@ jobs:
cd Megatron-DeepSpeed
pip install .
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi
cd tests
MEGATRON_CKPT_DIR=/blob/megatron_ckpt/ TORCH_EXTENSIONS_DIR=./torch-extensions pytest --color=yes --durations=0 --verbose ./
pytest $PYTEST_OPTS ./

View File

@ -26,7 +26,7 @@ jobs:
- name: Install pytorch
run: |
pip3 install -U --cache-dir /blob/torch_cache torch
pip3 install -U --cache-dir $TORCH_CACHE torch
python -c "import torch; print('torch:', torch.__version__, torch)"
python -c "import torch; print('CUDA available:', torch.cuda.is_available())"
@ -54,6 +54,5 @@ jobs:
cd DeepSpeed-MII
pip install .[dev]
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi
cd tests
TRANSFORMERS_CACHE=/blob/transformers_cache/ TORCH_EXTENSIONS_DIR=./torch-extensions pytest --color=yes --durations=0 --forked --verbose -m "deepspeed" ./
pytest $PYTEST_OPTS --forked -m "deepspeed" ./

View File

@ -20,7 +20,7 @@ jobs:
- name: Install pytorch
run: |
pip install -U --cache-dir /blob/torch_cache torch==1.13.1 torchvision --extra-index-url https://download.pytorch.org/whl/cu116
pip install -U --cache-dir $TORCH_CACHE torch==1.13.1 torchvision --extra-index-url https://download.pytorch.org/whl/cu116
python -c "import torch; print('torch:', torch.__version__, torch)"
python -c "import torch; print('CUDA available:', torch.cuda.is_available())"
@ -45,6 +45,5 @@ jobs:
- name: Unit tests
run: |
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi
cd tests
TRANSFORMERS_CACHE=/blob/transformers_cache/ TORCH_EXTENSIONS_DIR=./torch-extensions pytest --forked -m 'nightly' unit/ --torch_ver="1.13" --cuda_ver="11.6"
pytest $PYTEST_OPTS --forked -m 'nightly' unit/ --torch_ver="1.13" --cuda_ver="11.6"

View File

@ -42,7 +42,6 @@ jobs:
- name: Unit tests
run: |
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi
cd tests
TORCH_EXTENSIONS_DIR=./torch-extensions pytest -n 4 unit/ --torch_ver="1.12"
TORCH_EXTENSIONS_DIR=./torch-extensions pytest -m 'sequential' unit/ --torch_ver="1.12"
TRANSFORMERS_CACHE=/tmp/transformers_cache/ pytest $PYTEST_OPTS -n 4 unit/ --torch_ver="1.12"
TRANSFORMERS_CACHE=/tmp/transformers_cache/ pytest $PYTEST_OPTS -m 'sequential' unit/ --torch_ver="1.12"

View File

@ -26,7 +26,7 @@ jobs:
- name: Install pytorch
run: |
pip install -U --cache-dir /blob/torch_cache torch torchvision --extra-index-url https://download.pytorch.org/whl/cu116
pip install -U --cache-dir $TORCH_CACHE torch torchvision --extra-index-url https://download.pytorch.org/whl/cu116
python -c "import torch; print('torch:', torch.__version__, torch)"
python -c "import torch; print('CUDA available:', torch.cuda.is_available())"
@ -51,7 +51,12 @@ jobs:
- name: Unit tests
run: |
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi
cd tests
TORCH_EXTENSIONS_DIR=./torch-extensions pytest --forked -n 4 unit/ --torch_ver="2.0" --cuda_ver="11.7"
TORCH_EXTENSIONS_DIR=./torch-extensions pytest --forked -m 'sequential' unit/ --torch_ver="2.0" --cuda_ver="11.7"
coverage run --concurrency=multiprocessing -m pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="2.0" --cuda_ver="11.7"
coverage run --concurrency=multiprocessing -m pytest $PYTEST_OPTS --forked -m 'sequential' unit/ --torch_ver="2.0" --cuda_ver="11.7"
- name: Coverage report
run: |
cd tests
coverage combine
coverage report -m

View File

@ -45,7 +45,6 @@ jobs:
- name: Unit tests
run: |
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi
cd tests
TORCH_EXTENSIONS_DIR=./torch-extensions pytest --forked -n 4 unit/
TORCH_EXTENSIONS_DIR=./torch-extensions pytest --forked -m 'sequential' unit/
pytest $PYTEST_OPTS --forked -n 4 unit/
pytest $PYTEST_OPTS --forked -m 'sequential' unit/

View File

@ -20,7 +20,7 @@ jobs:
- name: Install pytorch
run: |
pip install -U --cache-dir /blob/torch_cache torch==1.9.0+cu111 torchvision==0.10.0+cu111 -f https://download.pytorch.org/whl/torch_stable.html
pip install -U --cache-dir $TORCH_CACHE torch==1.9.0+cu111 torchvision==0.10.0+cu111 -f https://download.pytorch.org/whl/torch_stable.html
python -c "import torch; print('torch:', torch.__version__, torch)"
python -c "import torch; print('CUDA available:', torch.cuda.is_available())"
@ -44,6 +44,6 @@ jobs:
- name: Unit tests
run: |
if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
cd tests
TORCH_EXTENSIONS_DIR=./torch-extensions pytest --forked -n 4 unit/ --torch_ver="1.9" --cuda_ver="11.1"
pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="1.9" --cuda_ver="11.1"

View File

@ -20,7 +20,7 @@ jobs:
- name: Install pytorch
run: |
pip install -U --cache-dir /blob/torch_cache torch==1.9.0+cu111 torchvision==0.10.0+cu111 -f https://download.pytorch.org/whl/torch_stable.html
pip install -U --cache-dir $TORCH_CACHE torch==1.9.0+cu111 torchvision==0.10.0+cu111 -f https://download.pytorch.org/whl/torch_stable.html
python -c "import torch; print('torch:', torch.__version__, torch)"
python -c "import torch; print('CUDA available:', torch.cuda.is_available())"
@ -45,7 +45,6 @@ jobs:
- name: Unit tests
run: |
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi
cd tests
TORCH_EXTENSIONS_DIR=./torch-extensions pytest --forked -n 4 unit/ --torch_ver="1.9" --cuda_ver="11"
TORCH_EXTENSIONS_DIR=./torch-extensions pytest --forked -m 'sequential' unit/ --torch_ver="1.9" --cuda_ver="11"
pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="1.9" --cuda_ver="11"
pytest $PYTEST_OPTS --forked -m 'sequential' unit/ --torch_ver="1.9" --cuda_ver="11"

View File

@ -27,7 +27,7 @@ jobs:
- name: Install pytorch
run: |
# use the same pytorch version as transformers CI
pip install -U --cache-dir /blob/torch_cache torch torchvision torchaudio -f https://download.pytorch.org/whl/torch_stable.html
pip install -U --cache-dir $TORCH_CACHE torch torchvision torchaudio -f https://download.pytorch.org/whl/torch_stable.html
python -c "import torch; print('torch:', torch.__version__, torch)"
python -c "import torch; print('CUDA available:', torch.cuda.is_available())"
@ -42,7 +42,7 @@ jobs:
- name: HF transformers tests
run: |
if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
git clone https://github.com/huggingface/transformers
cd transformers
# if needed switch to the last known good SHA until transformers@master is fixed
@ -57,4 +57,4 @@ jobs:
# force protobuf version due to issues
pip install "protobuf<4.21.0"
pip list
HF_DATASETS_CACHE=/blob/datasets_cache/ TRANSFORMERS_CACHE=/blob/transformers_cache/ WANDB_DISABLED=true TORCH_EXTENSIONS_DIR=./torch-extensions RUN_SLOW=1 pytest --color=yes --durations=0 --verbose tests/deepspeed
WANDB_DISABLED=true RUN_SLOW=1 pytest $PYTEST_OPTS tests/deepspeed

View File

@ -18,6 +18,16 @@ runs:
pip install wheel # required after pip>=23.1
echo PATH=$PATH >> $GITHUB_ENV # Make it so venv is inherited for other steps
shell: bash
- id: set-env-vars
run: |
echo TEST_DATA_DIR=/blob/ >> $GITHUB_ENV
echo TRANSFORMERS_CACHE=/blob/transformers_cache/ >> $GITHUB_ENV
echo TORCH_EXTENSIONS_DIR=./torch-extensions/ >> $GITHUB_ENV
echo TORCH_CACHE=/blob/torch_cache/ >> $GITHUB_ENV
echo HF_DATASETS_CACHE=/blob/datasets_cache/ >> $GITHUB_ENV
echo MEGATRON_CKPT_DIR=/blob/megatron_ckpt/ >> $GITHUB_ENV
echo PYTEST_OPTS="--color=yes --durations=0 --verbose -rF" >> $GITHUB_ENV
shell: bash
- id: print-env
run: |
which python

View File

@ -1,4 +1,5 @@
clang-format==16.0.2
coverage
docutils<0.18
future
importlib-metadata>=4

5
tests/.coveragerc Normal file
View File

@ -0,0 +1,5 @@
# .coveragerc to control coverage.py
[run]
parallel = True
sigterm = True
source = deepspeed

View File

@ -70,10 +70,18 @@ def pytest_runtest_call(item):
item.runtest = lambda: True # Dummy function so test is not run twice
# We allow DistributedTest to reuse distributed environments. When the last
# test for a class is run, we want to make sure those distributed environments
# are destroyed.
def pytest_runtest_teardown(item, nextitem):
if getattr(item.cls, "reuse_dist_env", False) and not nextitem:
dist_test_class = item.cls()
for num_procs, pool in dist_test_class._pool_cache.items():
dist_test_class._close_pool(pool, num_procs, force=True)
@pytest.hookimpl(tryfirst=True)
def pytest_fixture_setup(fixturedef, request):
if getattr(fixturedef.func, "is_dist_fixture", False):
#for val in dir(request):
# print(val.upper(), getattr(request, val), "\n")
dist_fixture_class = fixturedef.func()
dist_fixture_class(request)

View File

@ -4,6 +4,7 @@
# DeepSpeed Team
import pytest
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
@ -98,7 +99,12 @@ def cifar_trainset(fp16=False):
dist.barrier()
if local_rank != 0:
dist.barrier()
trainset = torchvision.datasets.CIFAR10(root='/blob/cifar10-data', train=True, download=True, transform=transform)
data_root = os.getenv("TEST_DATA_DIR", "/tmp/")
trainset = torchvision.datasets.CIFAR10(root=os.path.join(data_root, "cifar10-data"),
train=True,
download=True,
transform=transform)
if local_rank == 0:
dist.barrier()
return trainset
@ -114,6 +120,18 @@ def train_cifar(model, config, num_steps=400, average_dp_losses=True, fp16=True,
trainset = cifar_trainset(fp16=fp16)
config['local_rank'] = dist.get_rank()
# deepspeed_io defaults to creating a dataloader that uses a
# multiprocessing pool. Our tests use pools and we cannot nest pools in
# python. Therefore we're injecting this kwarg to ensure that no pools
# are used in the dataloader.
old_method = deepspeed.runtime.engine.DeepSpeedEngine.deepspeed_io
def new_method(*args, **kwargs):
kwargs["num_local_io_workers"] = 0
return old_method(*args, **kwargs)
deepspeed.runtime.engine.DeepSpeedEngine.deepspeed_io = new_method
engine, _, _, _ = deepspeed.initialize(config=config,
model=model,
model_parameters=[p for p in model.parameters()],

View File

@ -4,8 +4,11 @@
# DeepSpeed Team
import os
import re
import time
import inspect
import socket
import subprocess
from abc import ABC, abstractmethod
from pathlib import Path
@ -14,7 +17,6 @@ import torch.multiprocessing as mp
import deepspeed
from deepspeed.accelerator import get_accelerator
import deepspeed.comm as dist
from torch.multiprocessing import Process
import pytest
from _pytest.outcomes import Skipped
@ -40,11 +42,10 @@ def get_xdist_worker_id():
def get_master_port():
master_port = os.environ.get('DS_TEST_PORT', '29503')
xdist_worker_id = get_xdist_worker_id()
if xdist_worker_id is not None:
master_port = str(int(master_port) + xdist_worker_id)
return master_port
# Select a random open port
with socket.socket() as s:
s.bind(('', 0))
return str(s.getsockname()[1])
def set_accelerator_visible():
@ -54,7 +55,6 @@ def set_accelerator_visible():
xdist_worker_id = 0
if cuda_visible is None:
# CUDA_VISIBLE_DEVICES is not set, discover it using accelerator specific command instead
import subprocess
if get_accelerator().device_name() == 'cuda':
if is_rocm_pytorch():
rocm_smi = subprocess.check_output(['rocm-smi', '--showid'])
@ -64,7 +64,6 @@ def set_accelerator_visible():
nvidia_smi = subprocess.check_output(['nvidia-smi', '--list-gpus'])
num_accelerators = len(nvidia_smi.decode('utf-8').strip().split('\n'))
elif get_accelerator().device_name() == 'xpu':
import re
clinfo = subprocess.check_output(['clinfo'])
lines = clinfo.decode('utf-8').strip().split('\n')
num_accelerators = 0
@ -100,6 +99,8 @@ class DistributedExec(ABC):
init_distributed = True
set_dist_env = True
requires_cuda_env = True
reuse_dist_env = False
_pool_cache = {}
@abstractmethod
def run(self):
@ -115,7 +116,6 @@ class DistributedExec(ABC):
world_size = [world_size]
for procs in world_size:
self._launch_procs(procs)
time.sleep(0.5)
def _get_fixture_kwargs(self, request, func):
if not request:
@ -132,92 +132,92 @@ class DistributedExec(ABC):
return fixture_kwargs
def _launch_procs(self, num_procs):
# Verify we have enough accelerator devices to run this test
if get_accelerator().is_available() and get_accelerator().device_count() < num_procs:
pytest.skip(
f"Skipping test because not enough GPUs are available: {num_procs} required, {get_accelerator().device_count()} available"
)
# Set start method to `forkserver` (or `fork`)
mp.set_start_method('forkserver', force=True)
skip_msg = mp.Queue() # Allows forked processes to share pytest.skip reason
processes = []
for local_rank in range(num_procs):
p = Process(target=self._dist_init, args=(local_rank, num_procs, skip_msg))
p.start()
processes.append(p)
# Now loop and wait for a test to complete. The spin-wait here isn't a big
# deal because the number of processes will be O(#GPUs) << O(#CPUs).
any_done = False
start = time.time()
while (not any_done) and ((time.time() - start) < DEEPSPEED_TEST_TIMEOUT):
for p in processes:
if not p.is_alive():
any_done = True
break
time.sleep(.1) # So we don't hog CPU
# Create process pool or use cached one
master_port = None
if self.reuse_dist_env:
if num_procs not in self._pool_cache:
self._pool_cache[num_procs] = mp.Pool(processes=num_procs)
master_port = get_master_port()
pool = self._pool_cache[num_procs]
else:
pool = mp.Pool(processes=num_procs)
master_port = get_master_port()
# If we hit the timeout, then presume a test is hanged
if not any_done:
for p in processes:
p.terminate()
# Run the test
args = [(local_rank, num_procs, master_port) for local_rank in range(num_procs)]
skip_msgs_async = pool.starmap_async(self._dist_run, args)
try:
skip_msgs = skip_msgs_async.get(DEEPSPEED_TEST_TIMEOUT)
except mp.TimeoutError:
# Shortcut to exit pytest in the case of a hanged test. This
# usually means an environment error and the rest of tests will
# hang (causing super long unit test runtimes)
pytest.exit("Test hanged, exiting", returncode=0)
# Wait for all other processes to complete
for p in processes:
p.join(DEEPSPEED_UNIT_WORKER_TIMEOUT)
# Tear down distributed environment and close process pools
self._close_pool(pool, num_procs)
failed = [(rank, p) for rank, p in enumerate(processes) if p.exitcode != 0]
for rank, p in failed:
# If it still hasn't terminated, kill it because it hung.
if p.exitcode is None:
p.terminate()
pytest.fail(f'Worker {rank} hung.', pytrace=False)
if p.exitcode < 0:
pytest.fail(f'Worker {rank} killed by signal {-p.exitcode}', pytrace=False)
if p.exitcode > 0:
pytest.fail(f'Worker {rank} exited with code {p.exitcode}', pytrace=False)
# If we skipped a test, propagate that to this process
if any(skip_msgs):
assert len(set(skip_msgs)) == 1, "Multiple different skip messages received"
pytest.skip(skip_msgs[0])
if not skip_msg.empty():
# This assumed all skip messages are the same, it may be useful to
# add a check here to assert all exit messages are equal
pytest.skip(skip_msg.get())
def _dist_run(self, local_rank, num_procs, master_port):
skip_msg = ''
if not dist.is_initialized():
""" Initialize deepspeed.comm and execute the user function. """
if self.set_dist_env:
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = str(master_port)
os.environ['LOCAL_RANK'] = str(local_rank)
# NOTE: unit tests don't support multi-node so local_rank == global rank
os.environ['RANK'] = str(local_rank)
os.environ['WORLD_SIZE'] = str(num_procs)
def _dist_init(self, local_rank, num_procs, skip_msg):
"""Initialize deepspeed.comm and execute the user function. """
if self.set_dist_env:
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = get_master_port()
os.environ['LOCAL_RANK'] = str(local_rank)
# NOTE: unit tests don't support multi-node so local_rank == global rank
os.environ['RANK'] = str(local_rank)
os.environ['WORLD_SIZE'] = str(num_procs)
# turn off NCCL logging if set
os.environ.pop('NCCL_DEBUG', None)
# turn off NCCL logging if set
os.environ.pop('NCCL_DEBUG', None)
if get_accelerator().is_available():
set_accelerator_visible()
if get_accelerator().is_available():
set_accelerator_visible()
if self.init_distributed:
deepspeed.init_distributed(dist_backend=self.backend)
dist.barrier()
if self.init_distributed:
deepspeed.init_distributed(dist_backend=self.backend)
dist.barrier()
if get_accelerator().is_available():
get_accelerator().set_device(local_rank)
if get_accelerator().is_available():
get_accelerator().set_device(local_rank)
try:
self.run(**self._fixture_kwargs)
except BaseException as e:
if isinstance(e, Skipped):
skip_msg.put(e.msg)
skip_msg = e.msg
else:
raise e
if self.init_distributed or dist.is_initialized():
# make sure all ranks finish at the same time
return skip_msg
def _dist_destroy(self):
if (dist is not None) and dist.is_initialized():
dist.barrier()
# tear down after test completes
dist.destroy_process_group()
def _close_pool(self, pool, num_procs, force=False):
if force or not self.reuse_dist_env:
msg = pool.starmap(self._dist_destroy, [() for _ in range(num_procs)])
pool.close()
pool.join()
class DistributedFixture(DistributedExec):
"""

View File

@ -244,9 +244,7 @@ def run_backward(ds_config, seq_len, atol=1e-2, verbose=False):
check_equal(base_grads, ds_grads, atol=atol, verbose=verbose)
#test_backward[3-1024-120-16-24-True-True-0.05]
#test_backward[3-1024-52-16-24-False-True-0.2]
# 3-128-54-2-24-False-True-0.2
# NOTE: Keep these different params as they have helped find divergence in behavior between AMD and NVIDIA.
@pytest.mark.parametrize('batch_size, hidden_size, seq_len, heads, num_layers, is_preln, use_fp16, atol',
[
(64,160,128,2,24,False,True, 0.2),
@ -254,12 +252,6 @@ def run_backward(ds_config, seq_len, atol=1e-2, verbose=False):
(8,1600,128,25,3,True,True, 0.05),
(8,160,128,2,3,True,True, 0.1),
(8,1600,128,2,3,True,True, 0.05),
#(3,1024,119,16,24,True,False, 0.05),
#(3,1024,115,16,24,True,True, 0.05),
#(1024,128,10,2,2,False,False, 0.1),
#(3,1024,52,16,24,False,True, 0.2),
#(3,128,51,2,24,False,False, 0.1),
#(3,128,54,2,24,False,True, 0.2),
]) # yapf: disable
class TestCUDABackward(DistributedTest):
world_size = 1
@ -267,7 +259,7 @@ class TestCUDABackward(DistributedTest):
#This is to flush denorms in forward pass. Please refer to https://github.com/pytorch/pytorch/blob/main/docs/source/notes/numerical_accuracy.rst#reduced-precision-fp16-and-bf16-gemms-and-convolutions-on-amd-instinct-mi200-devices
os.environ['ROCBLAS_INTERNAL_FP16_ALT_IMPL'] = '1'
def test_backward(self, batch_size, hidden_size, seq_len, heads, num_layers, is_preln, use_fp16, atol):
def test_backward(self, is_preln, use_fp16, batch_size, hidden_size, seq_len, heads, num_layers, atol):
# Only run fp16 test cases on devices with FP16 capability.
if not get_accelerator().is_fp16_supported() and (use_fp16 is True or is_preln is False):
return
@ -286,38 +278,3 @@ class TestCUDABackward(DistributedTest):
ds_config.fp16 = use_fp16
run_backward(ds_config, seq_len, atol=atol, verbose=True)
# [
# (3,1024,128,16,24,True,False, 0.07),
# (3,1024,128,16,24,True,True, 0.05),
# (3,1024,128,16,24,False,False, 0.1),
# (3,1024,128,16,24,False,True, 0.2),
# ]) # yapf: disable
#def test_backward_stochastic(batch_size,
# hidden_size,
# seq_len,
# heads,
# num_layers,
# is_preln,
# use_fp16,
# atol):
# # Only run fp16 test cases on devices with FP16 capability.
# if not get_accelerator().is_fp16_supported() and use_fp16 is True:
# return
#
# ds_config = DeepSpeedTransformerConfig()
# ds_config.layer_id = None
# ds_config.batch_size = batch_size
# ds_config.hidden_size = hidden_size
# ds_config.intermediate_size = 4 * hidden_size
# ds_config.max_seq_length = seq_len
# ds_config.heads = heads
# ds_config.attn_dropout_ratio = 0.0
# ds_config.hidden_dropout_ratio = 0.0
# ds_config.num_hidden_layers = num_layers
# ds_config.pre_layer_norm = is_preln
# ds_config.initializer_range = 0.02
# ds_config.fp16 = use_fp16
# ds_config.stochastic_mode = True
#
# run_backward(ds_config, atol=atol)

View File

@ -224,6 +224,7 @@ def run_forward(ds_config, seq_len, atol=1e-2, verbose=False, test_bsz=None):
]) # yapf: disable
class TestCUDAForward(DistributedTest):
world_size = 1
reuse_dist_env = True
def test_forward(self, batch_size, hidden_size, seq_len, heads, num_layers, is_preln, use_fp16):
# Only run fp16 test cases on devices with FP16 capability.

View File

@ -34,17 +34,7 @@ class TestCPUAdagrad(DistributedTest):
init_distributed = False
set_dist_env = False
@pytest.mark.parametrize('model_size',
[
(64),
(22),
(55),
(127),
(1024),
(1048576),
(30000000),
]) # yapf: disable
def test_cpu_adagrad_opt(self, model_size):
def test_cpu_adagrad_opt(self, model_size=64):
device = 'cpu'
rng_state = torch.get_rng_state()
param = torch.nn.Parameter(torch.randn(model_size, device=device))
@ -65,14 +55,7 @@ class TestCPUAdagrad(DistributedTest):
check_equal(param, param1, atol=1e-2, verbose=True)
@pytest.mark.parametrize('model_size,vocabulary_size,dim',
[
(16 * 2, 16 * 4, 16),
(16 * 32, 16 * 256, 16),
(16 * 256, 16 * 16384, 16),
]) # yapf: disable
def test_cpu_adagrad_opt_sparse_embedding(self, model_size, vocabulary_size, dim):
def test_cpu_adagrad_opt_sparse_embedding(self, model_size=32, vocabulary_size=64, dim=16):
device = 'cpu'
rng_state = torch.get_rng_state()

View File

@ -36,6 +36,7 @@ adam_configs = [["AdamW", False, False, False, (FusedAdam, True)],
adam_configs)
class TestAdamConfigs(DistributedTest):
world_size = 1
reuse_dist_env = True
def test(self,
optimizer,

View File

@ -55,6 +55,7 @@ def _compare_optimizers(model_size, param1, optimizer1, param2, optimizer2):
]) # yapf: disable
class TestCPUAdam(DistributedTest):
world_size = 1
reuse_dist_env = True
requires_cuda_env = False
if not get_accelerator().is_available():
init_distributed = False

View File

@ -83,6 +83,7 @@ def _validate_handle_state(handle, single_submit, overlap_events):
@pytest.mark.parametrize("overlap_events", [True, False])
class TestRead(DistributedTest):
world_size = 1
reuse_dist_env = True
requires_cuda_env = False
if not get_accelerator().is_available():
init_distributed = False
@ -148,6 +149,7 @@ class TestRead(DistributedTest):
@pytest.mark.parametrize("overlap_events", [True, False])
class TestWrite(DistributedTest):
world_size = 1
reuse_dist_env = True
requires_cuda_env = False
if not get_accelerator().is_available():
init_distributed = False

View File

@ -8,7 +8,6 @@ import torch.nn as nn
import deepspeed.comm as dist
import deepspeed
import pytest
import copy
import os
import numpy as np
@ -334,18 +333,10 @@ class TestOneBitAdamCheckpointing(DistributedTest):
@pytest.mark.parametrize(
"topo_config",
[
{
"num_pp": 1,
"num_dp": 4
},
{
"num_pp": 2,
"num_dp": 2
},
{
"num_pp": 4,
"num_dp": 1
},
],
)
class TestOneBitAdamFP16Pipeline(DistributedTest):
@ -353,8 +344,8 @@ class TestOneBitAdamFP16Pipeline(DistributedTest):
def test(self, topo_config):
config_dict = {
"train_batch_size": 16,
"train_micro_batch_size_per_gpu": 4,
"train_batch_size": 4,
"grandient_accumulation_steps": 1,
"steps_per_print": 20,
"optimizer": {
"type": "OneBitAdam",
@ -384,20 +375,12 @@ class TestOneBitAdamFP16Pipeline(DistributedTest):
}
topo = PipeTopo(**topo_config)
steps = 500 # Must be >=100
steps = 100
# Allocate model for consistent initial weights.
init_net = AlexNetPipe()
test_net = copy.deepcopy(init_net)
# TODO: Add correctness tests/asserts comparing with baseline?
test_net = AlexNetPipe()
test_model = PipelineModule(layers=test_net.to_layers(), topology=topo, loss_fn=nn.CrossEntropyLoss())
test_losses = train_cifar(
test_model,
config=config_dict,
num_steps=steps,
fp16=config_dict["fp16"]["enabled"],
)
test_losses = train_cifar(test_model, config=config_dict, num_steps=steps, fp16=config_dict['fp16']['enabled'])
@pytest.mark.parametrize("dtype", [torch.float32, torch.float16], ids=["fp32", "fp16"])
@ -707,18 +690,10 @@ class TestZeroOneAdamCheckpointing(DistributedTest):
@pytest.mark.parametrize(
"topo_config",
[
{
"num_pp": 1,
"num_dp": 4
},
{
"num_pp": 2,
"num_dp": 2
},
{
"num_pp": 4,
"num_dp": 1
},
],
)
class TestZeroOneAdamFP16Pipeline(DistributedTest):
@ -726,8 +701,8 @@ class TestZeroOneAdamFP16Pipeline(DistributedTest):
def test(self, topo_config):
config_dict = {
"train_batch_size": 16,
"train_micro_batch_size_per_gpu": 4,
"train_batch_size": 4,
"grandient_accumulation_steps": 1,
"steps_per_print": 20,
"optimizer": {
"type": "ZeroOneAdam",
@ -760,20 +735,12 @@ class TestZeroOneAdamFP16Pipeline(DistributedTest):
}
topo = PipeTopo(**topo_config)
steps = 500 # Must be >=100
steps = 100
# Allocate model for consistent initial weights.
init_net = AlexNetPipe()
test_net = copy.deepcopy(init_net)
# TODO: Add correctness tests/asserts comparing with baseline?
test_net = AlexNetPipe()
test_model = PipelineModule(layers=test_net.to_layers(), topology=topo, loss_fn=nn.CrossEntropyLoss())
test_losses = train_cifar(
test_model,
config=config_dict,
num_steps=steps,
fp16=config_dict["fp16"]["enabled"],
)
test_losses = train_cifar(test_model, config=config_dict, num_steps=steps, fp16=config_dict['fp16']['enabled'])
@pytest.mark.parametrize("dtype", [torch.float32, torch.float16], ids=["fp32", "fp16"])
@ -1109,18 +1076,10 @@ class TestOneBitLambCheckpointing(DistributedTest):
@pytest.mark.parametrize(
"topo_config",
[
{
"num_pp": 1,
"num_dp": 4
},
{
"num_pp": 2,
"num_dp": 2
},
{
"num_pp": 4,
"num_dp": 1
},
],
)
class TestOneBitLambFP16Pipeline(DistributedTest):
@ -1128,8 +1087,8 @@ class TestOneBitLambFP16Pipeline(DistributedTest):
def test(self, topo_config):
config_dict = {
"train_batch_size": 16,
"train_micro_batch_size_per_gpu": 4,
"train_batch_size": 4,
"grandient_accumulation_steps": 1,
"steps_per_print": 20,
"optimizer": {
"type": "OneBitLamb",
@ -1159,20 +1118,12 @@ class TestOneBitLambFP16Pipeline(DistributedTest):
}
topo = PipeTopo(**topo_config)
steps = 500 # Must be >=100
steps = 100
# Allocate model for consistent initial weights.
init_net = AlexNetPipe()
test_net = copy.deepcopy(init_net)
# TODO: Add correctness tests/asserts comparing with baseline?
test_net = AlexNetPipe()
test_model = PipelineModule(layers=test_net.to_layers(), topology=topo, loss_fn=nn.CrossEntropyLoss())
test_losses = train_cifar(
test_model,
config=config_dict,
num_steps=steps,
fp16=config_dict["fp16"]["enabled"],
)
test_losses = train_cifar(test_model, config=config_dict, num_steps=steps, fp16=config_dict['fp16']['enabled'])
@pytest.mark.sequential

View File

@ -319,7 +319,7 @@ class TestAdamFP16ZeroOneCycleCompatibility(DistributedTest):
model = SimpleModel(hidden_dim)
model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters())
data_loader = random_dataloader(model=model, total_samples=50, hidden_dim=hidden_dim, device=model.device)
data_loader = random_dataloader(model=model, total_samples=10, hidden_dim=hidden_dim, device=model.device)
for n, batch in enumerate(data_loader):
loss = model(batch[0], batch[1])
model.backward(loss)
@ -328,11 +328,10 @@ class TestAdamFP16ZeroOneCycleCompatibility(DistributedTest):
@pytest.mark.parametrize("zero_stage", [1, 2, 3])
@pytest.mark.parametrize("use_cpu_offload", [True, False])
@pytest.mark.parametrize("hidden_dim", [9, 10])
class TestZeroStaticScale(DistributedTest):
world_size = 1
def test(self, zero_stage, use_cpu_offload, hidden_dim):
def test(self, zero_stage, use_cpu_offload, hidden_dim=4):
if use_cpu_offload and not deepspeed.ops.__compatible_ops__[CPUAdamBuilder.NAME]:
pytest.skip("cpu-adam is not compatible")

View File

@ -42,8 +42,8 @@ class TestPipeCifar10(DistributedTest):
skip_on_arch(min_arch=7)
config_dict = {
"train_batch_size": 16,
"train_micro_batch_size_per_gpu": 4,
"train_batch_size": 4,
"grandient_accumulation_steps": 1,
"steps_per_print": 20,
"optimizer": {
"type": "Adam",
@ -67,7 +67,7 @@ class TestPipeCifar10(DistributedTest):
}
topo = PipeTopo(**topo_config)
steps = 500 # must be >=100
steps = 100 # must be >=100
# Allocate model for consistent initial weights.
init_net = AlexNetPipe()

View File

@ -42,6 +42,7 @@ class TestDataLoaderDropLast(DistributedTest):
model=model,
training_data=train_dataset,
optimizer=optimizer)
training_dataloader.num_local_io_workers = 0 # We can't do nested mp.pool
for n, batch in enumerate(training_dataloader):
x = batch[0].to(get_accelerator().current_device_name())
y = batch[1].to(get_accelerator().current_device_name())

View File

@ -117,6 +117,7 @@ class TestConfigOptimizer(DistributedTest):
@pytest.mark.parametrize('grad_accum_dtype', [None, 'fp16', 'bf16', 'fp32'])
class TestOptimizerImplementation(DistributedTest):
world_size = 1
reuse_dist_env = True
def test(self, optimizer_extension, model_dtype, grad_accum_dtype):
if optimizer_extension == 'zero1':
@ -125,9 +126,9 @@ class TestOptimizerImplementation(DistributedTest):
zero_stage = 2
else:
zero_stage = 0
amp = True if optimizer_extension == 'amp' else False
fp16 = True if model_dtype == 'fp16' else False
bf16 = True if model_dtype == 'bf16' else False
amp = (optimizer_extension == 'amp')
fp16 = (model_dtype == 'fp16')
bf16 = (model_dtype == 'bf16')
# Skip checks
if bf16 and not bf16_required_version_check():
pytest.skip(

View File

@ -52,7 +52,7 @@ def dump_state_dict(model):
print(f"{name} {param.data}")
@pytest.mark.parametrize('zero_stage', [1, 2, 3])
@pytest.mark.parametrize("zero_stage", [1, 2, 3])
class TestZeroUnbalancedGradients(DistributedTest):
world_size = 1
@ -73,7 +73,7 @@ class TestZeroUnbalancedGradients(DistributedTest):
"fp16": {
"enabled": True,
"initial_scale_power": 8
}
},
}
hidden_dim = 4
@ -96,7 +96,7 @@ class TestZero3RepeatForwardLoop(DistributedTest):
"steps_per_print": 1,
"zero_optimization": {
"stage": zero_stage,
"stage3_param_persistence_threshold": 0
"stage3_param_persistence_threshold": 0,
},
"optimizer": {
"type": "Adam",
@ -107,7 +107,7 @@ class TestZero3RepeatForwardLoop(DistributedTest):
"fp16": {
"enabled": True,
"initial_scale_power": 8
}
},
}
hidden_dim = 4
@ -137,8 +137,8 @@ class TestZero3RepeatForwardLoop(DistributedTest):
# testing the fix https://github.com/microsoft/DeepSpeed/pull/1227
# also reproduces the https://github.com/microsoft/DeepSpeed/pull/1372
@pytest.mark.parametrize('zero_stage', [2, 3])
@pytest.mark.parametrize('freeze_params', [True, False])
@pytest.mark.parametrize("zero_stage", [2, 3])
@pytest.mark.parametrize("freeze_params", [True, False])
class TestZeroToFP32(DistributedTest):
world_size = 2
@ -151,7 +151,7 @@ class TestZeroToFP32(DistributedTest):
"steps_per_print": 1,
"zero_optimization": {
"stage": zero_stage,
"stage3_param_persistence_threshold": 0
"stage3_param_persistence_threshold": 0,
},
"optimizer": {
"type": "Adam",
@ -162,7 +162,7 @@ class TestZeroToFP32(DistributedTest):
"fp16": {
"enabled": True,
"initial_scale_power": 8
}
},
}
class MyModel(torch.nn.Module):
@ -227,7 +227,7 @@ class TestZeroToFP32(DistributedTest):
fp32_model = load_state_dict_from_zero_checkpoint(model.module, tmpdir)
fp32_state_dict = fp32_model.state_dict()
#dump_state_dict(fp32_model)
# dump_state_dict(fp32_model)
if dist.get_rank() == 0:
for name in orig_state_dict.keys():
@ -245,7 +245,7 @@ class TestZeroToFP32(DistributedTest):
"zero_allow_untested_optimizer": 1,
"zero_optimization": {
"stage": zero_stage,
"stage3_param_persistence_threshold": 0
"stage3_param_persistence_threshold": 0,
},
"optimizer": {
"type": "Adam",
@ -256,7 +256,7 @@ class TestZeroToFP32(DistributedTest):
"fp16": {
"enabled": True,
"initial_scale_power": 8
}
},
}
class MyModel(torch.nn.Module):
@ -293,10 +293,12 @@ class TestZeroToFP32(DistributedTest):
]
optim = torch.optim.SGD(optim_groups, lr=0.1)
model, _, _, _ = deepspeed.initialize(model=model,
model_parameters=model.parameters(),
optimizer=optim,
config=config_dict)
model, _, _, _ = deepspeed.initialize(
model=model,
model_parameters=model.parameters(),
optimizer=optim,
config=config_dict,
)
model.empty_partition_cache()
data_loader = random_dataloader(model=model, total_samples=16, hidden_dim=hidden_dim, device=model.device)
@ -312,7 +314,7 @@ class TestZeroToFP32(DistributedTest):
# make sure all sides saved it
dist.barrier()
#dump_state_dict(model)
# dump_state_dict(model)
orig_state_dict = {}
for name, param in model.module.named_parameters():
@ -330,7 +332,7 @@ class TestZeroToFP32(DistributedTest):
fp32_model = load_state_dict_from_zero_checkpoint(model.module, tmpdir)
fp32_state_dict = fp32_model.state_dict()
#dump_state_dict(fp32_model)
# dump_state_dict(fp32_model)
if dist.get_rank() == 0:
for name in orig_state_dict.keys():
@ -349,7 +351,7 @@ class TestIncorectAllgatherBucketSize(DistributedTest):
"steps_per_print": 1,
"zero_optimization": {
"stage": zero_stage,
"allgather_bucket_size": allgather_bucket_size
"allgather_bucket_size": allgather_bucket_size,
},
"optimizer": {
"type": "Adam",
@ -360,7 +362,7 @@ class TestIncorectAllgatherBucketSize(DistributedTest):
"fp16": {
"enabled": True,
"initial_scale_power": 8
}
},
}
hidden_dim = 4
@ -372,7 +374,7 @@ class TestIncorectAllgatherBucketSize(DistributedTest):
model, _, _, _ = deepspeed.initialize(config=config_dict,
model=model,
model_parameters=model.parameters())
assert "allgather_bucket_size must be a multiple of nccl_start_alignment_factor" in str(assertinfo)
assert ("allgather_bucket_size must be a multiple of nccl_start_alignment_factor" in str(assertinfo))
class TestPartitionNcclAlignment(DistributedTest):
@ -395,7 +397,7 @@ class TestPartitionNcclAlignment(DistributedTest):
"fp16": {
"enabled": True,
"initial_scale_power": 8
}
},
}
hidden_dim = 4
@ -405,7 +407,8 @@ class TestPartitionNcclAlignment(DistributedTest):
# get nccl all-gather send buffers alignment factor
nccl_start_alignment_factor = model.optimizer.nccl_start_alignment_factor
parallel_partitioned_bit16_groups = model.optimizer.parallel_partitioned_bit16_groups if zero_stage == 2 else model.optimizer.parallel_partitioned_fp16_groups
parallel_partitioned_bit16_groups = (model.optimizer.parallel_partitioned_bit16_groups
if zero_stage == 2 else model.optimizer.parallel_partitioned_fp16_groups)
for data_parallel_partitions in parallel_partitioned_bit16_groups:
for partition_id, partitioned_data in enumerate(data_parallel_partitions):
# verify that data partition start locations are 4-byte aligned
@ -458,9 +461,14 @@ class EltwiseMultiplicationTestNetwork_Dict(Module):
self.loss = L1Loss(reduction="none")
def forward(self, x: Tensor, y: Tensor, use_module_trace: bool, param_prefetching: bool) -> Dict[str, Tensor]:
_assert_partition_status(self,
{ZeroParamStatus.NOT_AVAILABLE, ZeroParamStatus.INFLIGHT, ZeroParamStatus.AVAILABLE}
if use_module_trace else {ZeroParamStatus.NOT_AVAILABLE})
_assert_partition_status(
self,
{
ZeroParamStatus.NOT_AVAILABLE,
ZeroParamStatus.INFLIGHT,
ZeroParamStatus.AVAILABLE,
} if use_module_trace else {ZeroParamStatus.NOT_AVAILABLE},
)
pre_layer_expected_states = {
ZeroParamStatus.INFLIGHT if param_prefetching else ZeroParamStatus.NOT_AVAILABLE,
@ -485,9 +493,14 @@ class EltwiseMultiplicationTestNetwork_Dict(Module):
loss = self.loss(y_hat, y)
_assert_partition_status(self,
{ZeroParamStatus.NOT_AVAILABLE, ZeroParamStatus.INFLIGHT, ZeroParamStatus.AVAILABLE}
if use_module_trace else {ZeroParamStatus.NOT_AVAILABLE})
_assert_partition_status(
self,
{
ZeroParamStatus.NOT_AVAILABLE,
ZeroParamStatus.INFLIGHT,
ZeroParamStatus.AVAILABLE,
} if use_module_trace else {ZeroParamStatus.NOT_AVAILABLE},
)
return {
"hidden1": hidden1,
@ -512,10 +525,12 @@ class EltwiseMultiplicationTestNetwork_NamedTuple(EltwiseMultiplicationTestNetwo
def forward(self, *args, **kwargs) -> EltwiseMultiplicationNamedTuple:
outputs_dicts = super().forward(*args, **kwargs)
return EltwiseMultiplicationNamedTuple(hidden1=outputs_dicts['hidden1'],
hidden2=outputs_dicts['hidden2'],
y_hat=outputs_dicts['y_hat'],
loss=outputs_dicts['loss'])
return EltwiseMultiplicationNamedTuple(
hidden1=outputs_dicts["hidden1"],
hidden2=outputs_dicts["hidden2"],
y_hat=outputs_dicts["y_hat"],
loss=outputs_dicts["loss"],
)
@staticmethod
def to_dict(outputs: EltwiseMultiplicationNamedTuple) -> Dict[str, Tensor]:
@ -527,18 +542,20 @@ class EltwiseMultiplicationTestNetwork_NamedTuple(EltwiseMultiplicationTestNetwo
}
EltwiseMultiplication_namedtuple = namedtuple('EltwiseMultiplication_namedtuple',
['hidden1', 'hidden2', 'y_hat', 'loss'])
EltwiseMultiplication_namedtuple = namedtuple("EltwiseMultiplication_namedtuple",
["hidden1", "hidden2", "y_hat", "loss"])
class EltwiseMultiplicationTestNetwork_namedtuple(EltwiseMultiplicationTestNetwork_Dict):
def forward(self, *args, **kwargs) -> EltwiseMultiplication_namedtuple:
outputs_dicts = super().forward(*args, **kwargs)
return EltwiseMultiplication_namedtuple(hidden1=outputs_dicts['hidden1'],
hidden2=outputs_dicts['hidden2'],
y_hat=outputs_dicts['y_hat'],
loss=outputs_dicts['loss'])
return EltwiseMultiplication_namedtuple(
hidden1=outputs_dicts["hidden1"],
hidden2=outputs_dicts["hidden2"],
y_hat=outputs_dicts["y_hat"],
loss=outputs_dicts["loss"],
)
@staticmethod
def to_dict(outputs: EltwiseMultiplicationNamedTuple) -> Dict[str, Tensor]:
@ -554,7 +571,12 @@ class EltwiseMultiplicationTestNetwork_Tuple(EltwiseMultiplicationTestNetwork_Di
def forward(self, *args, **kwargs) -> Tuple[Tensor, Tensor, Tensor, Tensor]:
outputs_dicts = super().forward(*args, **kwargs)
return (outputs_dicts['hidden1'], outputs_dicts['hidden2'], outputs_dicts['y_hat'], outputs_dicts['loss'])
return (
outputs_dicts["hidden1"],
outputs_dicts["hidden2"],
outputs_dicts["y_hat"],
outputs_dicts["loss"],
)
@staticmethod
def to_dict(outputs: Tuple[Tensor, Tensor, Tensor, Tensor]) -> Dict[str, Tensor]:
@ -570,7 +592,12 @@ class EltwiseMultiplicationTestNetwork_List(EltwiseMultiplicationTestNetwork_Dic
def forward(self, *args, **kwargs) -> List[Tensor]:
outputs_dicts = super().forward(*args, **kwargs)
return [outputs_dicts['hidden1'], outputs_dicts['hidden2'], outputs_dicts['y_hat'], outputs_dicts['loss']]
return [
outputs_dicts["hidden1"],
outputs_dicts["hidden2"],
outputs_dicts["y_hat"],
outputs_dicts["loss"],
]
@staticmethod
def to_dict(outputs: List[Tensor]) -> Dict[str, Tensor]:
@ -582,31 +609,55 @@ class EltwiseMultiplicationTestNetwork_List(EltwiseMultiplicationTestNetwork_Dic
}
@pytest.mark.parametrize("param_persistence_threshold", [0, 10])
@pytest.mark.parametrize("fp16_enabled", [True, False])
@pytest.mark.parametrize("contiguous_gradients", [True, False])
@pytest.mark.parametrize("offload_optimizer", [True, False])
@pytest.mark.parametrize("zero_grad", [True, False])
@pytest.mark.parametrize("prefetching", [True, False])
@pytest.mark.parametrize("reduce_scatter", [True, False])
@pytest.mark.parametrize("model_class", [
EltwiseMultiplicationTestNetwork_Dict, EltwiseMultiplicationTestNetwork_NamedTuple,
EltwiseMultiplicationTestNetwork_namedtuple, EltwiseMultiplicationTestNetwork_Tuple,
EltwiseMultiplicationTestNetwork_List
])
class TestZero3ParamPartitioningBase(DistributedTest):
world_size = 2
def test(
@pytest.mark.parametrize("param_persistence_threshold", [0, 10])
def test_param_persistence_threshold(self, param_persistence_threshold):
self._test(param_persistence_threshold=param_persistence_threshold)
@pytest.mark.parametrize("fp16_enabled", [True, False])
def test_fp16_enabled(self, fp16_enabled):
self._test(fp16_enabled=fp16_enabled)
@pytest.mark.parametrize("contiguous_gradients", [True, False])
def test_contiguous_gradients(self, contiguous_gradients):
self._test(contiguous_gradients=contiguous_gradients)
@pytest.mark.parametrize("offload_optimizer", [True, False])
def test_offload_optimizer(self, offload_optimizer):
self._test(offload_optimizer=offload_optimizer)
@pytest.mark.parametrize("zero_grad", [True, False])
def test_zero_grad(self, zero_grad):
self._test(zero_grad=zero_grad)
@pytest.mark.parametrize("prefetching", [True, False])
def test_prefetching(self, prefetching):
self._test(prefetching=prefetching)
@pytest.mark.parametrize("reduce_scatter", [True, False])
def test_reduce_scatter(self, reduce_scatter):
self._test(reduce_scatter=reduce_scatter)
@pytest.mark.parametrize("model_class", [
EltwiseMultiplicationTestNetwork_Dict, EltwiseMultiplicationTestNetwork_NamedTuple,
EltwiseMultiplicationTestNetwork_namedtuple, EltwiseMultiplicationTestNetwork_Tuple,
EltwiseMultiplicationTestNetwork_List
])
def test_model_class(self, model_class):
self._test(model_class=model_class)
def _test(
self,
param_persistence_threshold: int,
fp16_enabled: bool,
contiguous_gradients: bool,
offload_optimizer: bool,
zero_grad: bool,
prefetching: bool,
reduce_scatter: bool,
model_class: EltwiseMultiplicationTestNetwork_Dict,
param_persistence_threshold: int = 0,
fp16_enabled: bool = False,
contiguous_gradients: bool = False,
offload_optimizer: bool = False,
zero_grad: bool = False,
prefetching: bool = False,
reduce_scatter: bool = False,
model_class: EltwiseMultiplicationTestNetwork_Dict = EltwiseMultiplicationTestNetwork_Dict,
) -> None:
if offload_optimizer and not contiguous_gradients:
return
@ -624,18 +675,18 @@ class TestZero3ParamPartitioningBase(DistributedTest):
"stage3_param_persistence_threshold": param_persistence_threshold,
"contiguous_gradients": contiguous_gradients,
"stage3_prefetch_bucket_size": prefetch_bucket_size if prefetching else 0,
"reduce_scatter": reduce_scatter
"reduce_scatter": reduce_scatter,
},
"optimizer": {
"type": "Adam",
"params": {
"lr": 1.
"lr": 1.0
}
},
"fp16": {
"enabled": fp16_enabled,
"loss_scale": 1.,
}
"loss_scale": 1.0,
},
}
if offload_optimizer:
@ -649,9 +700,11 @@ class TestZero3ParamPartitioningBase(DistributedTest):
weight.ds_tensor.data = torch.full_like(weight.ds_tensor.data, (i + 1) * (1 + dist.get_rank()))
def create_tensor(vals, dtype: torch.dtype = None) -> Tensor:
return torch.as_tensor(vals,
dtype=dtype or (torch.float16 if fp16_enabled else torch.float32),
device=ds_engine.device)
return torch.as_tensor(
vals,
dtype=dtype or (torch.float16 if fp16_enabled else torch.float32),
device=ds_engine.device,
)
expected_hidden1 = create_tensor([
[1, 1, 1, 1, 1],
@ -672,8 +725,16 @@ class TestZero3ParamPartitioningBase(DistributedTest):
for train_iter in range(3):
activations = ds_engine(
x=torch.ones((m, n), dtype=torch.float16 if fp16_enabled else torch.float32, device=ds_engine.device),
y=torch.ones((m, n), dtype=torch.float16 if fp16_enabled else torch.float32, device=ds_engine.device),
x=torch.ones(
(m, n),
dtype=torch.float16 if fp16_enabled else torch.float32,
device=ds_engine.device,
),
y=torch.ones(
(m, n),
dtype=torch.float16 if fp16_enabled else torch.float32,
device=ds_engine.device,
),
use_module_trace=train_iter > 0,
param_prefetching=prefetching and train_iter > 0,
)
@ -708,21 +769,33 @@ class TestZero3ParamPartitioningBase(DistributedTest):
grad_multiplier = 1 if zero_grad else (train_iter + 1)
if dist.get_rank() == 0:
assert torch.allclose(dloss_wrt_layer3.to(get_accelerator().device_name()),
grad_multiplier * create_tensor([2] * 8, torch.float))
assert torch.allclose(dloss_wrt_layer2.to(get_accelerator().device_name()),
grad_multiplier * create_tensor([3 * 1] * 8, torch.float))
assert torch.allclose(dloss_wrt_layer1.to(get_accelerator().device_name()),
grad_multiplier * create_tensor([3 * 2 * 1] * 8, torch.float))
assert torch.allclose(
dloss_wrt_layer3.to(get_accelerator().device_name()),
grad_multiplier * create_tensor([2] * 8, torch.float),
)
assert torch.allclose(
dloss_wrt_layer2.to(get_accelerator().device_name()),
grad_multiplier * create_tensor([3 * 1] * 8, torch.float),
)
assert torch.allclose(
dloss_wrt_layer1.to(get_accelerator().device_name()),
grad_multiplier * create_tensor([3 * 2 * 1] * 8, torch.float),
)
elif dist.get_rank() == 1:
# parameters dont split evenly across ranks so rank 1 has a zero-padded
# partition
assert torch.allclose(dloss_wrt_layer3.to(get_accelerator().device_name()),
grad_multiplier * create_tensor(([8] * 7) + [0], torch.float))
assert torch.allclose(dloss_wrt_layer2.to(get_accelerator().device_name()),
grad_multiplier * create_tensor(([6 * 2] * 7) + [0], torch.float))
assert torch.allclose(dloss_wrt_layer1.to(get_accelerator().device_name()),
grad_multiplier * create_tensor(([6 * 4 * 1] * 7) + [0], torch.float))
assert torch.allclose(
dloss_wrt_layer3.to(get_accelerator().device_name()),
grad_multiplier * create_tensor(([8] * 7) + [0], torch.float),
)
assert torch.allclose(
dloss_wrt_layer2.to(get_accelerator().device_name()),
grad_multiplier * create_tensor(([6 * 2] * 7) + [0], torch.float),
)
assert torch.allclose(
dloss_wrt_layer1.to(get_accelerator().device_name()),
grad_multiplier * create_tensor(([6 * 4 * 1] * 7) + [0], torch.float),
)
else:
raise RuntimeError("test has world size of two")
@ -776,13 +849,13 @@ class TestZero3ParamPartitioningLargeParam(DistributedTest):
"optimizer": {
"type": "Adam",
"params": {
"lr": 1.
"lr": 1.0
}
},
"fp16": {
"enabled": True,
"loss_scale": 1.,
}
"loss_scale": 1.0,
},
}
with deepspeed.zero.Init(mem_efficient_linear=False, enabled=init_context_manager):
model = LargeParamModel()
@ -794,26 +867,27 @@ class TestZero3ParamPartitioningLargeParam(DistributedTest):
partition_sz = math.ceil(param_sz / self.world_size)
for rank_idx, start_idx in enumerate(range(0, param_sz, partition_sz)):
activation_from_partition = activation[start_idx:start_idx + partition_sz]
assert torch.allclose(activation_from_partition, torch.full_like(activation_from_partition, rank_idx))
assert torch.allclose(
activation_from_partition,
torch.full_like(activation_from_partition, rank_idx),
)
ds_engine.backward(activation.sum())
ds_engine.allreduce_gradients()
avgd_gradients = ds_engine.optimizer.averaged_gradients
assert set(avgd_gradients.keys()) == {0}, "should only have one parameter group"
weight_gradient, = avgd_gradients[0]
(weight_gradient, ) = avgd_gradients[0]
expected_weight_gradient = (train_iter + 1) * torch.full_like(weight_gradient, 1)
assert torch.allclose(weight_gradient, expected_weight_gradient)
@pytest.mark.parametrize("param_sz", [100, 1_000, 10_000])
@pytest.mark.parametrize("n_layers", [100, 1_000])
@pytest.mark.parametrize("init_context_manager", [True, False])
class TestZero3ParamPartitioningManyParams(DistributedTest):
world_size = 4
world_size = 2
def test(self, param_sz: int, n_layers: int, init_context_manager: bool) -> None:
def test(self, init_context_manager: bool, param_sz: int = 100, n_layers: int = 100) -> None:
class ManyParamModel(Module):
@ -854,13 +928,13 @@ class TestZero3ParamPartitioningManyParams(DistributedTest):
"optimizer": {
"type": "Adam",
"params": {
"lr": 1.
"lr": 1.0
}
},
"fp16": {
"enabled": True,
"loss_scale": 1.,
}
"loss_scale": 1.0,
},
}
with deepspeed.zero.Init(config=ds_cfg, mem_efficient_linear=False, enabled=init_context_manager):
@ -923,20 +997,23 @@ class TestZero3InitForParentWeightInitialization(DistributedTest):
"optimizer": {
"type": "Adam",
"params": {
"lr": 1.
"lr": 1.0
}
},
"fp16": {
"enabled": True,
"loss_scale": 1.,
}
"loss_scale": 1.0,
},
}
with deepspeed.zero.Init(config=ds_cfg, mem_efficient_linear=False, enabled=True):
model = ModelWhereParentInitializesChildWeights()
assert model.linear.weight.ds_tensor.numel() == math.ceil(12 / self.world_size)
assert torch.allclose(model.linear.weight.ds_tensor, torch.full_like(model.linear.weight.ds_tensor, 1))
assert torch.allclose(
model.linear.weight.ds_tensor,
torch.full_like(model.linear.weight.ds_tensor, 1),
)
@pytest.mark.skip("not working")
@ -946,17 +1023,29 @@ class TestZero3InitForParentWeightInitialization(DistributedTest):
@pytest.mark.parametrize("zero_grad", [True, False])
@pytest.mark.parametrize("prefetching", [True, False])
@pytest.mark.parametrize("reduce_scatter", [True, False])
@pytest.mark.parametrize("model_class", [
EltwiseMultiplicationTestNetwork_Dict, EltwiseMultiplicationTestNetwork_NamedTuple,
EltwiseMultiplicationTestNetwork_namedtuple, EltwiseMultiplicationTestNetwork_Tuple,
EltwiseMultiplicationTestNetwork_List
])
@pytest.mark.parametrize(
"model_class",
[
EltwiseMultiplicationTestNetwork_Dict,
EltwiseMultiplicationTestNetwork_NamedTuple,
EltwiseMultiplicationTestNetwork_namedtuple,
EltwiseMultiplicationTestNetwork_Tuple,
EltwiseMultiplicationTestNetwork_List,
],
)
class TestZero3ParamPartitioningBaseBF16(DistributedTest):
world_size = 2
def test(self, param_persistence_threshold: int, contiguous_gradients: bool, offload_optimizer: bool,
zero_grad: bool, prefetching: bool, reduce_scatter: bool,
model_class: EltwiseMultiplicationTestNetwork_Dict) -> None:
def test(
self,
param_persistence_threshold: int,
contiguous_gradients: bool,
offload_optimizer: bool,
zero_grad: bool,
prefetching: bool,
reduce_scatter: bool,
model_class: EltwiseMultiplicationTestNetwork_Dict,
) -> None:
if offload_optimizer and not contiguous_gradients:
return
@ -973,18 +1062,18 @@ class TestZero3ParamPartitioningBaseBF16(DistributedTest):
"stage3_param_persistence_threshold": param_persistence_threshold,
"contiguous_gradients": contiguous_gradients,
"stage3_prefetch_bucket_size": prefetch_bucket_size if prefetching else 0,
"reduce_scatter": reduce_scatter
"reduce_scatter": reduce_scatter,
},
"optimizer": {
"type": "Adam",
"params": {
"lr": 1.
"lr": 1.0
}
},
"bf16": {
"enabled": True,
"loss_scale": 1.,
}
"loss_scale": 1.0,
},
}
if offload_optimizer:
@ -1055,21 +1144,33 @@ class TestZero3ParamPartitioningBaseBF16(DistributedTest):
grad_multiplier = 1 if zero_grad else (train_iter + 1)
if dist.get_rank() == 0:
assert torch.allclose(dloss_wrt_layer3.to(get_accelerator().device_name()),
grad_multiplier * create_tensor([2] * 8).to(expected_grad_dtype))
assert torch.allclose(dloss_wrt_layer2.to(get_accelerator().device_name()),
grad_multiplier * create_tensor([3 * 1] * 8).to(expected_grad_dtype))
assert torch.allclose(dloss_wrt_layer1.to(get_accelerator().device_name()),
grad_multiplier * create_tensor([3 * 2 * 1] * 8).to(expected_grad_dtype))
assert torch.allclose(
dloss_wrt_layer3.to(get_accelerator().device_name()),
grad_multiplier * create_tensor([2] * 8).to(expected_grad_dtype),
)
assert torch.allclose(
dloss_wrt_layer2.to(get_accelerator().device_name()),
grad_multiplier * create_tensor([3 * 1] * 8).to(expected_grad_dtype),
)
assert torch.allclose(
dloss_wrt_layer1.to(get_accelerator().device_name()),
grad_multiplier * create_tensor([3 * 2 * 1] * 8).to(expected_grad_dtype),
)
elif dist.get_rank() == 1:
# parameters dont split evenly across ranks so rank 1 has a zero-padded
# partition
assert torch.allclose(dloss_wrt_layer3.to(get_accelerator().device_name()),
grad_multiplier * create_tensor(([8] * 7) + [0]).to(expected_grad_dtype))
assert torch.allclose(dloss_wrt_layer2.to(get_accelerator().device_name()),
grad_multiplier * create_tensor(([6 * 2] * 7) + [0]).to(expected_grad_dtype))
assert torch.allclose(dloss_wrt_layer1.to(get_accelerator().device_name()),
grad_multiplier * create_tensor(([6 * 4 * 1] * 7) + [0]).to(expected_grad_dtype))
assert torch.allclose(
dloss_wrt_layer3.to(get_accelerator().device_name()),
grad_multiplier * create_tensor(([8] * 7) + [0]).to(expected_grad_dtype),
)
assert torch.allclose(
dloss_wrt_layer2.to(get_accelerator().device_name()),
grad_multiplier * create_tensor(([6 * 2] * 7) + [0]).to(expected_grad_dtype),
)
assert torch.allclose(
dloss_wrt_layer1.to(get_accelerator().device_name()),
grad_multiplier * create_tensor(([6 * 4 * 1] * 7) + [0]).to(expected_grad_dtype),
)
else:
raise RuntimeError("test has world size of two")
@ -1104,7 +1205,7 @@ class TestZeroOffloadStage1(DistributedTest):
"offload_optimizer": {
"device": "cpu"
}
}
},
}
hidden_dim = 10
@ -1118,7 +1219,7 @@ class TestZeroOffloadStage1(DistributedTest):
model.step()
@pytest.mark.parametrize('return_type', [tuple, list, dict])
@pytest.mark.parametrize("return_type", [tuple, list, dict])
class TestZero3DictFwd(DistributedTest):
world_size = 1
@ -1137,7 +1238,7 @@ class TestZero3DictFwd(DistributedTest):
},
"zero_optimization": {
"stage": 3
}
},
}
hidden_dim = 10
@ -1152,7 +1253,7 @@ class TestZero3DictFwd(DistributedTest):
x = self.l1(x)
loss = self.cel(x, y)
if return_type == dict:
val = {'a': x, 'loss': loss, 'b': 1, 'c': None}
val = {"a": x, "loss": loss, "b": 1, "c": None}
elif return_type == list:
val = [x, loss]
elif return_type == tuple:
@ -1170,14 +1271,14 @@ class TestZero3DictFwd(DistributedTest):
for n, batch in enumerate(data_loader):
loss = model(batch[0], batch[1])
if return_type == dict:
loss = loss['loss']
loss = loss["loss"]
else:
loss = loss[1]
model.backward(loss)
model.step()
@pytest.mark.parametrize('zero_stage', [1, 2, 3])
@pytest.mark.parametrize("zero_stage", [1, 2, 3])
class TestZeroAdamOptimizerStepCount(DistributedTest):
world_size = 1
@ -1201,7 +1302,7 @@ class TestZeroAdamOptimizerStepCount(DistributedTest):
"fp16": {
"enabled": True,
"initial_scale_power": 8
}
},
}
hidden_dim = 4
@ -1221,13 +1322,13 @@ class TestZeroAdamOptimizerStepCount(DistributedTest):
for sub_group_id, _ in enumerate(optimizer.fp16_groups):
fp32_param = optimizer.fp32_partitioned_groups_flat[sub_group_id]
state = optimizer.optimizer.state[fp32_param]
step_counts.append(state['step'])
step_counts.append(state["step"])
assert all(step == step_counts[0] for step in step_counts)
elif zero_stage == 1 or zero_stage == 2:
for param_group in optimizer.optimizer.param_groups:
for param in param_group['params']:
for param in param_group["params"]:
state = optimizer.optimizer.state[param]
step_counts.append(state['step'])
step_counts.append(state["step"])
assert all(step == step_counts[0] for step in step_counts)
@ -1249,7 +1350,7 @@ class TestZeroFrozenWeights(DistributedTest):
},
"zero_optimization": {
"stage": 3
}
},
}
hidden_dim = 10
@ -1287,7 +1388,7 @@ class TestZeroFrozenWeights(DistributedTest):
model.step()
@pytest.mark.parametrize('force_ds_optim', [True, False])
@pytest.mark.parametrize("force_ds_optim", [True, False])
class TestZeroOffloadOptim(DistributedTest):
world_size = 1
@ -1320,7 +1421,7 @@ class TestZeroOffloadOptim(DistributedTest):
model, _, _, _ = deepspeed.initialize(model=model, optimizer=optimizer, config=config_dict)
@pytest.mark.parametrize('training', [True, False])
@pytest.mark.parametrize("training", [True, False])
class TestZeroPartitionCache(DistributedTest):
world_size = 1
@ -1334,8 +1435,8 @@ class TestZeroPartitionCache(DistributedTest):
},
"zero_optimization": {
"stage": 3,
"stage3_param_persistence_threshold": hidden_dim
}
"stage3_param_persistence_threshold": hidden_dim,
},
}
if training:
config_dict["optimizer"] = {"type": "Adam"}
@ -1346,11 +1447,13 @@ class TestZeroPartitionCache(DistributedTest):
model, _, _, _ = deepspeed.initialize(model=model, config=config_dict)
dtype = torch.half
data_loader = random_dataloader(model=model,
total_samples=6,
hidden_dim=hidden_dim,
device=model.device,
dtype=dtype)
data_loader = random_dataloader(
model=model,
total_samples=6,
hidden_dim=hidden_dim,
device=model.device,
dtype=dtype,
)
for _, batch in enumerate(data_loader):
loss = model(batch[0], batch[1])

View File

@ -68,11 +68,15 @@ def run_fragmented_model(model, config_dict, hidden_dim, dtype):
validate_full_tensors(model)
model.step()
# Needed in ZeRO 3. Not doing so can give memory leak
model.destroy()
@pytest.mark.parametrize('frozen_weights', [True, False])
class TestTensorFragment(DistributedTest):
# Need multiple gpus to test possible hanging
world_size = 2
reuse_dist_env = True
@pytest.mark.parametrize('zero_stage', [1, 2, 3])
@pytest.mark.parametrize('offload_device', [OffloadDeviceEnum.none, OffloadDeviceEnum.cpu, OffloadDeviceEnum.nvme])