mirror of
https://github.com/pytorch/pytorch.git
synced 2025-11-18 17:45:09 +08:00
Reduce usage of sys.exit to skip tests
Factor out `exit_if_lt_x_gpu` Replace checks by `unittest.skip*` where possible
This commit is contained in:
@ -25,7 +25,7 @@ from torch.testing._internal.common_distributed import (
|
||||
DistributedTestBase,
|
||||
MultiThreadedTestCase,
|
||||
requires_accelerator_dist_backend,
|
||||
TEST_SKIPS,
|
||||
skip_if_no_gpu,
|
||||
)
|
||||
from torch.testing._internal.common_utils import (
|
||||
instantiate_parametrized_tests,
|
||||
@ -486,10 +486,8 @@ def with_comms(func=None):
|
||||
|
||||
@wraps(func)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
if (
|
||||
BACKEND == dist.Backend.NCCL or BACKEND == dist.Backend.XCCL
|
||||
) and torch.accelerator.device_count() < self.world_size:
|
||||
sys.exit(TEST_SKIPS[f"multi-gpu-{self.world_size}"].exit_code)
|
||||
if BACKEND in (dist.Backend.NCCL, dist.Backend.XCCL):
|
||||
exit_if_lt_x_accelerators(self.world_size)
|
||||
|
||||
kwargs["device"] = DEVICE
|
||||
self.pg = self.create_pg(device=DEVICE)
|
||||
@ -502,9 +500,9 @@ def with_comms(func=None):
|
||||
|
||||
|
||||
class TestCollectivesWithDistributedBackend(DistributedTestBase):
|
||||
@skip_if_no_gpu
|
||||
@with_comms()
|
||||
def test_all_gather_into_tensor_coalesced(self, device):
|
||||
exit_if_lt_x_accelerators(self.world_size)
|
||||
tensors = [
|
||||
torch.ones([4], device=device),
|
||||
torch.ones([4], device=device) + 1,
|
||||
@ -576,9 +574,8 @@ class TestCollectivesWithDistributedBackend(DistributedTestBase):
|
||||
compiled_allreduce(torch.randn(8, device=device), self.pg)
|
||||
|
||||
@unittest.skipIf(not HAS_GPU, "Inductor+gpu needs triton and recent GPU arch")
|
||||
@skip_if_no_gpu
|
||||
def test_tracing_with_fakepg(self, device=DEVICE):
|
||||
exit_if_lt_x_accelerators(self.world_size)
|
||||
|
||||
def allreduce(t, pg):
|
||||
return ft_c.all_reduce(t, "sum", pg)
|
||||
|
||||
@ -619,9 +616,9 @@ class TestDistributedBackendCollectivesWithWorldSize4(
|
||||
def world_size(self):
|
||||
return 4
|
||||
|
||||
@skip_if_no_gpu
|
||||
@with_comms()
|
||||
def test_permute_tensor_with_sub_group(self, device):
|
||||
exit_if_lt_x_accelerators(self.world_size)
|
||||
mesh_dim_names = ["dp", "tp"]
|
||||
|
||||
mesh_2d = dt.init_device_mesh(
|
||||
|
||||
@ -122,6 +122,12 @@ def requires_ddp_rank(device):
|
||||
return device in DDP_RANK_DEVICES
|
||||
|
||||
|
||||
def exit_if_lt_x_cuda_devs(x):
|
||||
"""Exit process unless at least the given number of CUDA devices are available"""
|
||||
if torch.cuda.device_count() < x:
|
||||
sys.exit(TEST_SKIPS[f"multi-gpu-{x}"].exit_code)
|
||||
|
||||
|
||||
# allows you to check for multiple accelerator irrespective of device type
|
||||
# to add new device types to this check simply follow the same format
|
||||
# and append an elif with the conditional and appropriate device count function for your new device
|
||||
@ -136,8 +142,6 @@ def skip_if_no_gpu(func):
|
||||
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
if not (TEST_CUDA or TEST_HPU or TEST_XPU):
|
||||
sys.exit(TEST_SKIPS["no_cuda"].exit_code)
|
||||
world_size = int(os.environ["WORLD_SIZE"])
|
||||
if TEST_CUDA and torch.cuda.device_count() < world_size:
|
||||
sys.exit(TEST_SKIPS[f"multi-gpu-{world_size}"].exit_code)
|
||||
@ -148,7 +152,9 @@ def skip_if_no_gpu(func):
|
||||
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
return unittest.skipUnless(
|
||||
TEST_CUDA or TEST_HPU or TEST_XPU, TEST_SKIPS["no_cuda"].message
|
||||
)(wrapper)
|
||||
|
||||
|
||||
# TODO (kwen2501): what is the purpose of this decorator? Tests with this
|
||||
@ -180,23 +186,20 @@ def skip_if_odd_worldsize(func):
|
||||
|
||||
|
||||
def require_n_gpus_for_nccl_backend(n, backend):
|
||||
return skip_if_lt_x_gpu(n) if backend == "nccl" else unittest.skipIf(False, None)
|
||||
return (
|
||||
skip_if_lt_x_gpu(n)
|
||||
if backend == "nccl"
|
||||
else unittest.skipIf(False, TEST_SKIPS[f"multi-gpu-{n}"].message)
|
||||
)
|
||||
|
||||
|
||||
def import_transformers_or_skip():
|
||||
def decorator(func):
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
try:
|
||||
from transformers import AutoModelForMaskedLM, BertConfig # noqa: F401
|
||||
try:
|
||||
from transformers import AutoModelForMaskedLM, BertConfig # noqa: F401
|
||||
|
||||
return func(*args, **kwargs)
|
||||
except ImportError:
|
||||
sys.exit(TEST_SKIPS["importerror"].exit_code)
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
return unittest.skipIf(False, "Dummy")
|
||||
except ImportError:
|
||||
return unittest.skip(TEST_SKIPS["importerror"].message)
|
||||
|
||||
|
||||
def at_least_x_gpu(x):
|
||||
|
||||
@ -7,8 +7,9 @@ import torch
|
||||
import torch.distributed as dist
|
||||
from torch.distributed import rpc
|
||||
from torch.testing._internal.common_distributed import (
|
||||
exit_if_lt_x_cuda_devs,
|
||||
MultiProcessTestCase,
|
||||
TEST_SKIPS,
|
||||
require_n_gpus_for_nccl_backend,
|
||||
tp_transports,
|
||||
)
|
||||
|
||||
@ -94,10 +95,10 @@ def with_comms(func=None, init_rpc=True, backend="nccl"):
|
||||
|
||||
@wraps(func)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
if backend == "nccl" and torch.cuda.device_count() < self.world_size:
|
||||
sys.exit(TEST_SKIPS[f"multi-gpu-{self.world_size}"].exit_code)
|
||||
if backend == "nccl":
|
||||
exit_if_lt_x_cuda_devs(self.world_size)
|
||||
self.init_comms(init_rpc=init_rpc, backend=backend)
|
||||
func(self, *args, **kwargs)
|
||||
self.destroy_comms(destroy_rpc=init_rpc)
|
||||
|
||||
return wrapper
|
||||
return require_n_gpus_for_nccl_backend(1, backend)(wrapper)
|
||||
|
||||
@ -5,7 +5,6 @@
|
||||
import contextlib
|
||||
import functools
|
||||
import itertools
|
||||
import sys
|
||||
import types
|
||||
from collections.abc import Callable, Iterator, Sequence
|
||||
from dataclasses import dataclass
|
||||
@ -40,12 +39,12 @@ from torch.distributed.tensor.parallel import (
|
||||
SequenceParallel,
|
||||
)
|
||||
from torch.testing._internal.common_distributed import (
|
||||
exit_if_lt_x_cuda_devs,
|
||||
MultiProcContinuousTest,
|
||||
MultiProcessTestCase,
|
||||
MultiThreadedTestCase,
|
||||
run_subtests,
|
||||
skip_if_lt_x_gpu,
|
||||
TEST_SKIPS,
|
||||
)
|
||||
from torch.testing._internal.common_utils import (
|
||||
TEST_CUDA,
|
||||
@ -393,8 +392,8 @@ class DTensorTestBase(MultiProcessTestCase):
|
||||
return init_device_mesh(self.device_type, (self.world_size,))
|
||||
|
||||
def init_pg(self, eager_init, backend: Optional[str] = None) -> None:
|
||||
if "nccl" in self.backend and torch.cuda.device_count() < self.world_size:
|
||||
sys.exit(TEST_SKIPS[f"multi-gpu-{self.world_size}"].exit_code)
|
||||
if "nccl" in self.backend:
|
||||
exit_if_lt_x_cuda_devs(self.world_size)
|
||||
|
||||
curr_backend = dist.get_default_backend_for_device(self.device_type)
|
||||
|
||||
|
||||
@ -60,6 +60,7 @@ from torch.testing._internal.common_distributed import (
|
||||
captured_output,
|
||||
cleanup_temp_dir,
|
||||
DistTestCases,
|
||||
exit_if_lt_x_cuda_devs,
|
||||
init_multigpu_helper,
|
||||
initialize_temp_directories,
|
||||
MultiProcessTestCase,
|
||||
@ -602,10 +603,8 @@ class TestDistBackend(MultiProcessTestCase):
|
||||
self.rank = rank
|
||||
self.file_name = file_name
|
||||
|
||||
if torch.cuda.is_available() and torch.cuda.device_count() < int(
|
||||
self.world_size
|
||||
):
|
||||
sys.exit(TEST_SKIPS[f"multi-gpu-{self.world_size}"].exit_code)
|
||||
if torch.cuda.is_available():
|
||||
exit_if_lt_x_cuda_devs(int(self.world_size))
|
||||
try:
|
||||
pg_timeout_seconds = CUSTOM_PG_TIMEOUT.get(test_name, default_pg_timeout)
|
||||
timeout = timedelta(seconds=pg_timeout_seconds)
|
||||
|
||||
Reference in New Issue
Block a user