mirror of
https://github.com/pytorch/pytorch.git
synced 2025-10-21 13:44:15 +08:00
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/44735 Reviewed By: mruberry Differential Revision: D23731306 Pulled By: ezyang fbshipit-source-id: 0ba009a99e475ddbe22981be8ac636f8a1c8b02f
106 lines
3.2 KiB
Python
106 lines
3.2 KiB
Python
import os
|
|
import sys
|
|
import tempfile
|
|
from functools import wraps
|
|
import torch
|
|
import torch.cuda
|
|
import torch.distributed as dist
|
|
from torch.testing._internal.common_utils import TestCase, find_free_port, run_tests
|
|
from torch.distributed.distributed_c10d import _get_default_group
|
|
from torch.testing._internal.distributed.distributed_test import (
|
|
DistributedTest, TestDistBackend
|
|
)
|
|
|
|
CPP_EXTENSIONS_WARNING = """
|
|
Ninja (https://ninja-build.org) must be available to run C++ extensions tests,
|
|
but it could not be found. Install ninja with `pip install ninja`
|
|
or `conda install ninja`.
|
|
"""
|
|
|
|
if not dist.is_available():
|
|
print("Distributed not available, skipping tests", file=sys.stderr)
|
|
sys.exit(0)
|
|
|
|
BACKEND = os.environ["BACKEND"]
|
|
INIT_METHOD = os.getenv("INIT_METHOD", "env://")
|
|
|
|
|
|
def skip_if_no_ninja(func):
|
|
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
try:
|
|
import torch.utils.cpp_extension
|
|
torch.utils.cpp_extension.verify_ninja_availability()
|
|
except RuntimeError:
|
|
print(CPP_EXTENSIONS_WARNING)
|
|
return 0
|
|
|
|
return func(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
if BACKEND == "gloo" or BACKEND == "nccl":
|
|
|
|
class TestDistBackendWithFork(TestDistBackend, DistributedTest._DistTestBase):
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self._fork_processes()
|
|
|
|
|
|
elif BACKEND == "mpi":
|
|
WORLD_SIZE = os.environ["WORLD_SIZE"]
|
|
dist.init_process_group(init_method=INIT_METHOD, backend="mpi")
|
|
|
|
class TestMPI(DistributedTest._DistTestBase):
|
|
pass
|
|
|
|
elif BACKEND == "test":
|
|
class TestBackendDynamicLoad(TestCase):
|
|
def setUp(self):
|
|
super(TestBackendDynamicLoad, self).setUp()
|
|
|
|
def _load_test_backend(self):
|
|
temp_dir = tempfile.mkdtemp()
|
|
src = "{}/../cpp_extensions/cpp_c10d_extension.cpp".format(os.path.abspath(os.path.dirname(__file__)))
|
|
extension = torch.utils.cpp_extension.load(
|
|
name="torch_test",
|
|
sources=[src],
|
|
build_directory=temp_dir
|
|
)
|
|
|
|
@skip_if_no_ninja
|
|
def test_backend_apis(self):
|
|
self._load_test_backend()
|
|
|
|
os.environ['WORLD_SIZE'] = '1'
|
|
os.environ['MASTER_ADDR'] = '127.0.0.1'
|
|
os.environ['MASTER_PORT'] = str(find_free_port())
|
|
os.environ['RANK'] = '0'
|
|
|
|
dist.init_process_group(backend='test', init_method='env://', world_size=1, rank=0)
|
|
self.assertEqual(dist.get_rank(), 0)
|
|
self.assertEqual(dist.get_world_size(), 1)
|
|
|
|
process_group = _get_default_group()
|
|
work = process_group.allreduce([torch.rand(1), torch.rand(1)])
|
|
self.assertTrue(work.wait())
|
|
self.assertTrue(work.is_completed())
|
|
self.assertTrue(work.is_success())
|
|
|
|
work = process_group.broadcast([torch.rand(1)])
|
|
self.assertTrue(work.wait())
|
|
self.assertTrue(work.is_completed())
|
|
self.assertTrue(work.is_success())
|
|
|
|
dist.destroy_process_group()
|
|
|
|
if __name__ == "__main__":
|
|
assert (
|
|
not torch.cuda._initialized
|
|
), "test_distributed must not have initialized CUDA context on main process"
|
|
|
|
run_tests()
|