Revert "Remove NO_MULTIPROCESSING_SPAWN checks (#146705)"

This reverts commit 40ad5e01dff05c7d64e070fb01683820e678f788.

Reverted https://github.com/pytorch/pytorch/pull/146705 on behalf of https://github.com/cyyever due to Broke lint?, I guess land race with rufff update ([comment](https://github.com/pytorch/pytorch/pull/146705#issuecomment-2689603077))
This commit is contained in:
PyTorch MergeBot
2025-02-28 03:04:37 +00:00
parent 3ce352e389
commit 926b7b5027
11 changed files with 157 additions and 24 deletions

View File

@ -19,6 +19,7 @@ from torch.testing._internal.common_utils import (
IS_MACOS,
IS_WINDOWS,
load_tests,
NO_MULTIPROCESSING_SPAWN,
run_tests,
slowTest,
TEST_WITH_ASAN,
@ -470,17 +471,30 @@ class TestMultiprocessing(TestCase):
with ctx.Pool(3) as pool:
pool.map(simple_autograd_function, [1, 2, 3])
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN, "Test needs to use spawn multiprocessing"
)
def test_autograd_fine_with_spawn(self):
ctx = mp.get_context("spawn")
simple_autograd_function()
with ctx.Pool(3) as pool:
pool.map(simple_autograd_function, [1, 2, 3])
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_cuda_simple(self):
torch.cuda.FloatTensor([1]) # initialize CUDA outside of leak checker
self._test_sharing(mp.get_context("spawn"), "cuda", torch.float)
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_cuda_memory_allocation(self):
ctx = mp.get_context("spawn")
@ -498,6 +512,11 @@ class TestMultiprocessing(TestCase):
e.set()
p.join(1)
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_cuda_ipc_deadlock(self):
ctx = mp.get_context("spawn")
@ -517,6 +536,11 @@ class TestMultiprocessing(TestCase):
self.assertFalse(p.is_alive())
@slowTest
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_cuda_send_many(self, name=None, size=5, count=100000):
ctx = mp.get_context("spawn")
@ -548,6 +572,11 @@ class TestMultiprocessing(TestCase):
p2.join(1)
p3.join(1)
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
@unittest.skipIf(not TEST_MULTIGPU, "found only 1 GPU")
def test_cuda_small_tensors(self):
@ -630,6 +659,11 @@ if __name__ == "__main__":
)
self.assertRegex(stderr, "Cannot re-initialize CUDA in forked subprocess.")
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_event(self):
ctx = mp.get_context("spawn")
@ -661,6 +695,11 @@ if __name__ == "__main__":
event.synchronize()
c2p.put(1) # notify parent synchronization is done
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_event_multiprocess(self):
event = torch.cuda.Event(enable_timing=False, interprocess=True)
@ -685,6 +724,11 @@ if __name__ == "__main__":
self.assertTrue(event.query())
p.join()
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
@unittest.skipIf(not TEST_MULTIGPU, "found only 1 GPU")
def test_event_handle_multi_gpu(self):
@ -716,6 +760,11 @@ if __name__ == "__main__":
c2p.put(1) # notify synchronization is done in child
p2c.get() # wait for parent to finish before destructing child event
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_event_handle_importer(self):
e0 = torch.cuda.Event(enable_timing=False, interprocess=True)
@ -753,6 +802,11 @@ if __name__ == "__main__":
# destructing e1
p2c.get()
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_event_handle_exporter(self):
e0 = torch.cuda.Event(enable_timing=False, interprocess=True)
@ -879,7 +933,7 @@ if __name__ == "__main__":
@unittest.skipIf(TEST_WITH_ASAN, "non-deterministically hangs with ASAN")
def test_leaf_variable_sharing(self):
devices = ["cpu"]
if torch.cuda.is_available() and TEST_CUDA_IPC:
if torch.cuda.is_available() and not NO_MULTIPROCESSING_SPAWN and TEST_CUDA_IPC:
devices.append("cuda")
for device in devices:
for requires_grad in [True, False]:
@ -914,6 +968,11 @@ if __name__ == "__main__":
RuntimeError, r"requires_grad", lambda: queue.put(var)
)
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_cuda_variable_sharing(self):
for requires_grad in [True, False]:
@ -924,6 +983,11 @@ if __name__ == "__main__":
)
self._test_autograd_sharing(var, mp.get_context("spawn"))
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_mixed_types_cuda_sharing(self):
self._test_mixed_types_cuda_sharing(mp.get_context("spawn"))
@ -932,14 +996,29 @@ if __name__ == "__main__":
param = Parameter(torch.arange(1.0, 26).view(5, 5))
self._test_autograd_sharing(param, is_parameter=True)
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_cuda_parameter_sharing(self):
param = Parameter(torch.arange(1.0, 26, device="cuda").view(5, 5))
self._test_autograd_sharing(param, mp.get_context("spawn"), is_parameter=True)
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
def test_integer_parameter_serialization_cpu(self):
self._test_integer_parameter_serialization(device="cpu")
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_integer_parameter_serialization_cuda(self):
self._test_integer_parameter_serialization(device="cuda")