Remove NO_MULTIPROCESSING_SPAWN checks (#146705)

py 3.9 has spawn.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/146705
Approved by: https://github.com/colesbury
This commit is contained in:
cyy
2025-02-28 05:53:19 +00:00
committed by PyTorch MergeBot
parent 3b4b23ab0b
commit b0dfd242fa
11 changed files with 23 additions and 156 deletions

View File

@ -17,7 +17,6 @@ from torch.testing._internal.common_distributed import (
skip_if_rocm_multiprocess,
)
from torch.testing._internal.common_utils import (
NO_MULTIPROCESSING_SPAWN,
run_tests,
skip_but_pass_in_sandcastle_if,
TEST_WITH_DEV_DBG_ASAN,
@ -47,10 +46,6 @@ if TEST_WITH_DEV_DBG_ASAN:
)
sys.exit(0)
if NO_MULTIPROCESSING_SPAWN:
print("Spawn not available, skipping tests.", file=sys.stderr)
sys.exit(0)
BACKEND = os.environ["BACKEND"]
if BACKEND == "gloo" or BACKEND == "nccl":

View File

@ -37,7 +37,6 @@ from torch.testing._internal.common_utils import (
IS_CI,
IS_MACOS,
IS_WINDOWS,
NO_MULTIPROCESSING_SPAWN,
run_tests,
skip_but_pass_in_sandcastle_if,
skip_if_pytest,
@ -509,11 +508,6 @@ if not (TEST_WITH_DEV_DBG_ASAN or IS_WINDOWS or IS_MACOS):
mpc._poll()
self.assertEqual(4, mock_join.call_count)
@skip_but_pass_in_sandcastle_if(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
def test_multiprocessing_context_poll_raises_exception(self):
mp_context = MultiprocessContext(
name="test_mp",

View File

@ -8,7 +8,7 @@ import torch
import torch.distributed as c10d
import torch.multiprocessing as mp
from torch.testing._internal.common_distributed import MultiProcessTestCase
from torch.testing._internal.common_utils import load_tests, NO_MULTIPROCESSING_SPAWN
from torch.testing._internal.common_utils import load_tests
# Torch distributed.nn is not available in windows
@ -27,10 +27,6 @@ if not c10d.is_available():
print("c10d not available, skipping tests", file=sys.stderr)
sys.exit(0)
if NO_MULTIPROCESSING_SPAWN:
print("spawn not available, skipping tests", file=sys.stderr)
sys.exit(0)
class AbstractProcessGroupShareTensorTest:
world_size = 2

View File

@ -13,11 +13,7 @@ if not dist.is_available():
print("Distributed not available, skipping tests", file=sys.stderr)
sys.exit(0)
from torch.testing._internal.common_utils import (
NO_MULTIPROCESSING_SPAWN,
run_tests,
TEST_WITH_DEV_DBG_ASAN,
)
from torch.testing._internal.common_utils import run_tests, TEST_WITH_DEV_DBG_ASAN
from torch.testing._internal.distributed.distributed_test import (
DistributedTest,
TestDistBackend,
@ -31,10 +27,6 @@ if TEST_WITH_DEV_DBG_ASAN:
)
sys.exit(0)
if NO_MULTIPROCESSING_SPAWN:
print("Spawn not available, skipping tests.", file=sys.stderr)
sys.exit(0)
_allowed_backends = ("gloo", "nccl", "ucc")
if (
"BACKEND" not in os.environ

View File

@ -60,7 +60,6 @@ from torch.testing._internal.common_utils import (
IS_SANDCASTLE,
IS_WINDOWS,
load_tests,
NO_MULTIPROCESSING_SPAWN,
parametrize,
run_tests,
serialTest,
@ -1088,11 +1087,6 @@ except RuntimeError as e:
@slowTest
@unittest.skipIf(TEST_WITH_ROCM, "ROCm doesn't support device side asserts")
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
def test_multinomial_invalid_probs_cuda(self):
self._spawn_test_multinomial_invalid_probs_cuda([1.0, -1.0, 1.0])
self._spawn_test_multinomial_invalid_probs_cuda([1.0, inf, 1.0])
@ -1121,11 +1115,6 @@ except RuntimeError as e:
return err
@slowTest
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@skipIfRocm
def test_index_out_of_bounds_exception_cuda(self):
test_method = TestCuda._test_index_bounds_cuda

View File

@ -28,7 +28,6 @@ from torch.testing._internal.common_utils import (
IS_SANDCASTLE,
IS_WINDOWS,
load_tests,
NO_MULTIPROCESSING_SPAWN,
parametrize,
run_tests,
skipIfNoDill,
@ -101,21 +100,20 @@ TEST_CUDA_IPC = (
TEST_MULTIGPU = TEST_CUDA_IPC and torch.cuda.device_count() > 1
if not NO_MULTIPROCESSING_SPAWN:
# We want to use `spawn` if able because some of our tests check that the
# data loader terminiates gracefully. To prevent hanging in the testing
# process, such data loaders are run in a separate subprocess.
#
# We also want to test the `pin_memory=True` configuration, thus `spawn` is
# required to launch such processes and they initialize the CUDA context.
#
# Mixing different start method is a recipe for disaster (e.g., using a fork
# `mp.Event` with a spawn `mp.Process` segfaults). So we set this globally
# to avoid bugs.
#
# Get a multiprocessing context because some test / third party library will
# set start_method when imported, and setting again triggers `RuntimeError`.
mp = mp.get_context(method="spawn")
# We want to use `spawn` if able because some of our tests check that the
# data loader terminates gracefully. To prevent hanging in the testing
# process, such data loaders are run in a separate subprocess.
#
# We also want to test the `pin_memory=True` configuration, thus `spawn` is
# required to launch such processes and they initialize the CUDA context.
#
# Mixing different start method is a recipe for disaster (e.g., using a fork
# `mp.Event` with a spawn `mp.Process` segfaults). So we set this globally
# to avoid bugs.
#
# Get a multiprocessing context because some test / third party library will
# set start_method when imported, and setting again triggers `RuntimeError`.
mp = mp.get_context(method="spawn")
# 60s of timeout?
@ -1430,7 +1428,7 @@ except RuntimeError as e:
p.terminate()
def test_timeout(self):
if TEST_CUDA and not NO_MULTIPROCESSING_SPAWN:
if TEST_CUDA:
# This test runs in a subprocess, which can only initialize CUDA with spawn.
# _test_timeout_pin_memory with pin_memory=True initializes CUDA when the iterator is
# constructed.
@ -2313,8 +2311,7 @@ except RuntimeError as e:
def test_sampler(self):
self._test_sampler()
self._test_sampler(num_workers=4)
if not NO_MULTIPROCESSING_SPAWN:
self._test_batch_sampler(num_workers=4, multiprocessing_context="spawn")
self._test_batch_sampler(num_workers=4, multiprocessing_context="spawn")
def _test_batch_sampler(self, **kwargs):
# [(0, 1), (2, 3, 4), (5, 6), (7, 8, 9), ...]
@ -2338,8 +2335,7 @@ except RuntimeError as e:
def test_batch_sampler(self):
self._test_batch_sampler()
self._test_batch_sampler(num_workers=4)
if not NO_MULTIPROCESSING_SPAWN:
self._test_batch_sampler(num_workers=4, multiprocessing_context="spawn")
self._test_batch_sampler(num_workers=4, multiprocessing_context="spawn")
@unittest.skipIf(not TEST_CUDA, "CUDA unavailable")
def test_shuffle_pin_memory(self):
@ -2496,7 +2492,7 @@ except RuntimeError as e:
# not be called before process end. It is important to see that the
# processes still exit in both cases.
if pin_memory and (not TEST_CUDA or NO_MULTIPROCESSING_SPAWN or IS_WINDOWS):
if pin_memory and (not TEST_CUDA or IS_WINDOWS):
# This test runs in a subprocess, which can only initialize CUDA with spawn.
# DataLoader with pin_memory=True initializes CUDA when its iterator is constructed.
# For windows, pin_memory sometimes causes CUDA oom.

View File

@ -19,7 +19,6 @@ from torch.testing._internal.common_utils import (
IS_MACOS,
IS_WINDOWS,
load_tests,
NO_MULTIPROCESSING_SPAWN,
run_tests,
slowTest,
TEST_WITH_ASAN,
@ -471,30 +470,17 @@ class TestMultiprocessing(TestCase):
with ctx.Pool(3) as pool:
pool.map(simple_autograd_function, [1, 2, 3])
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN, "Test needs to use spawn multiprocessing"
)
def test_autograd_fine_with_spawn(self):
ctx = mp.get_context("spawn")
simple_autograd_function()
with ctx.Pool(3) as pool:
pool.map(simple_autograd_function, [1, 2, 3])
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_cuda_simple(self):
torch.cuda.FloatTensor([1]) # initialize CUDA outside of leak checker
self._test_sharing(mp.get_context("spawn"), "cuda", torch.float)
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_cuda_memory_allocation(self):
ctx = mp.get_context("spawn")
@ -512,11 +498,6 @@ class TestMultiprocessing(TestCase):
e.set()
p.join(1)
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_cuda_ipc_deadlock(self):
ctx = mp.get_context("spawn")
@ -536,11 +517,6 @@ class TestMultiprocessing(TestCase):
self.assertFalse(p.is_alive())
@slowTest
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_cuda_send_many(self, name=None, size=5, count=100000):
ctx = mp.get_context("spawn")
@ -572,11 +548,6 @@ class TestMultiprocessing(TestCase):
p2.join(1)
p3.join(1)
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
@unittest.skipIf(not TEST_MULTIGPU, "found only 1 GPU")
def test_cuda_small_tensors(self):
@ -659,11 +630,6 @@ if __name__ == "__main__":
)
self.assertRegex(stderr, "Cannot re-initialize CUDA in forked subprocess.")
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_event(self):
ctx = mp.get_context("spawn")
@ -695,11 +661,6 @@ if __name__ == "__main__":
event.synchronize()
c2p.put(1) # notify parent synchronization is done
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_event_multiprocess(self):
event = torch.cuda.Event(enable_timing=False, interprocess=True)
@ -724,11 +685,6 @@ if __name__ == "__main__":
self.assertTrue(event.query())
p.join()
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
@unittest.skipIf(not TEST_MULTIGPU, "found only 1 GPU")
def test_event_handle_multi_gpu(self):
@ -760,11 +716,6 @@ if __name__ == "__main__":
c2p.put(1) # notify synchronization is done in child
p2c.get() # wait for parent to finish before destructing child event
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_event_handle_importer(self):
e0 = torch.cuda.Event(enable_timing=False, interprocess=True)
@ -802,11 +753,6 @@ if __name__ == "__main__":
# destructing e1
p2c.get()
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_event_handle_exporter(self):
e0 = torch.cuda.Event(enable_timing=False, interprocess=True)
@ -933,7 +879,7 @@ if __name__ == "__main__":
@unittest.skipIf(TEST_WITH_ASAN, "non-deterministically hangs with ASAN")
def test_leaf_variable_sharing(self):
devices = ["cpu"]
if torch.cuda.is_available() and not NO_MULTIPROCESSING_SPAWN and TEST_CUDA_IPC:
if torch.cuda.is_available() and TEST_CUDA_IPC:
devices.append("cuda")
for device in devices:
for requires_grad in [True, False]:
@ -968,11 +914,6 @@ if __name__ == "__main__":
RuntimeError, r"requires_grad", lambda: queue.put(var)
)
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_cuda_variable_sharing(self):
for requires_grad in [True, False]:
@ -983,11 +924,6 @@ if __name__ == "__main__":
)
self._test_autograd_sharing(var, mp.get_context("spawn"))
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_mixed_types_cuda_sharing(self):
self._test_mixed_types_cuda_sharing(mp.get_context("spawn"))
@ -996,29 +932,14 @@ if __name__ == "__main__":
param = Parameter(torch.arange(1.0, 26).view(5, 5))
self._test_autograd_sharing(param, is_parameter=True)
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_cuda_parameter_sharing(self):
param = Parameter(torch.arange(1.0, 26, device="cuda").view(5, 5))
self._test_autograd_sharing(param, mp.get_context("spawn"), is_parameter=True)
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
def test_integer_parameter_serialization_cpu(self):
self._test_integer_parameter_serialization(device="cpu")
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_integer_parameter_serialization_cuda(self):
self._test_integer_parameter_serialization(device="cuda")

View File

@ -12,7 +12,6 @@ import torch.multiprocessing as mp
from torch.testing._internal.common_utils import (
IS_WINDOWS,
NO_MULTIPROCESSING_SPAWN,
run_tests,
TestCase,
parametrize,
@ -212,9 +211,6 @@ class _TestMultiProcessing:
self.assertLess(time.time() - start, nested_child_sleep / 2)
time.sleep(0.1)
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that don't support the spawn start method")
class SpawnTest(TestCase, _TestMultiProcessing):
start_method = 'spawn'

View File

@ -36,7 +36,7 @@ from torch.testing._internal.common_optimizers import (
from torch.testing._internal.common_utils import ( # type: ignore[attr-defined]
MI300_ARCH, TEST_WITH_TORCHINDUCTOR, TEST_WITH_ROCM, run_tests, IS_JETSON,
IS_FILESYSTEM_UTF8_ENCODING, NO_MULTIPROCESSING_SPAWN,
IS_FILESYSTEM_UTF8_ENCODING,
IS_SANDCASTLE, IS_FBCODE, IS_REMOTE_GPU, skipIfRocmArch, skipIfTorchInductor, load_tests, slowTest, slowTestIf,
skipIfCrossRef, TEST_WITH_CROSSREF, skipIfTorchDynamo, skipRocmIfTorchInductor, set_default_dtype,
skipCUDAMemoryLeakCheckIf, BytesIOContext,
@ -9603,8 +9603,6 @@ tensor([[[1.+1.j, 1.+1.j, 1.+1.j, ..., 1.+1.j, 1.+1.j, 1.+1.j],
# FIXME: port to a distributed test suite
@slowTest
@unittest.skipIf(NO_MULTIPROCESSING_SPAWN, "Disabled for environments that \
don't support multiprocessing with spawn start method")
def test_multinomial_invalid_probs(self):
def _spawn_method(self, method, arg):
try:

View File

@ -1437,11 +1437,7 @@ NOTEST_CPU = "cpu" in split_if_not_empty(os.getenv('PYTORCH_TESTING_DEVICE_EXCEP
skipIfNoDill = unittest.skipIf(not TEST_DILL, "no dill")
# Python 2.7 doesn't have spawn
NO_MULTIPROCESSING_SPAWN: bool = TestEnvironment.def_flag(
"NO_MULTIPROCESSING_SPAWN",
env_var="NO_MULTIPROCESSING_SPAWN",
)
NO_MULTIPROCESSING_SPAWN: bool = False
TEST_WITH_ASAN: bool = TestEnvironment.def_flag(
"TEST_WITH_ASAN",
env_var="PYTORCH_TEST_WITH_ASAN",

View File

@ -82,7 +82,6 @@ from torch.testing._internal.common_utils import (
IS_WINDOWS,
FILE_SCHEMA,
IS_FBCODE,
NO_MULTIPROCESSING_SPAWN,
IS_SANDCASTLE,
skip_but_pass_in_sandcastle,
skip_but_pass_in_sandcastle_if,
@ -5119,11 +5118,6 @@ class DistributedTest:
BACKEND not in DistTestCases.backend_feature["cuda"],
f"The {BACKEND} backend does not support DDP communication hook on CUDA devices",
)
@skip_but_pass_in_sandcastle_if(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@skip_if_lt_x_gpu(int(os.environ["WORLD_SIZE"]))
def test_ddp_hook_parity_post_localSGD(self):
# Although we start run local SGD at iteration 10, since we still use the global process group to run it,