Revert "Ensure ncclCommAbort can abort stuck ncclCommInitRank (#103264)"

This reverts commit 03881b0c925f191ec41d6899d589ed420ac285b5.

Reverted https://github.com/pytorch/pytorch/pull/103264 on behalf of https://github.com/osalpekar due to This commits seems to have been causing failures in test_nccl_init_abort. Those failures may have been masked by pre-existing failures in the distributed jobs on trunk when running CI on this PR. Since those breaking changes are now reverted, we should be able to rebase this and get clean signal + uncover the breakages caused by this PR. ([comment](https://github.com/pytorch/pytorch/pull/103264#issuecomment-1599451197))
This commit is contained in:
PyTorch MergeBot
2023-06-20 20:29:43 +00:00
parent d06fc1bfda
commit f7737bb96b
3 changed files with 7 additions and 76 deletions

View File

@ -9725,47 +9725,6 @@ class DistributedTest:
ddp._check_reducer_finalized()
ddp(input)
@skip_if_lt_x_gpu(2)
@skip_but_pass_in_sandcastle_if(
BACKEND != "nccl",
"TORCH_NCCL_USE_COMM_NONBLOCKING only applies to NCCL"
)
def test_nccl_init_abort(self):
"""
Tests that we can abort a NCCL communicator during initialization and
recover appropriately.
"""
# Reinitialize global process group with TORCH_NCCL_USE_COMM_NONBLOCKING=1
os.environ["TORCH_NCCL_USE_COMM_NONBLOCKING"] = "1"
dist.destroy_process_group()
timeout = timedelta(seconds=1)
dist.init_process_group(
init_method=INIT_METHOD,
backend=BACKEND,
world_size=int(os.environ["WORLD_SIZE"]),
rank=self.rank,
timeout=timeout,
)
# Abort pg in background thread.
running = True
def abort():
pg = _get_default_group()
while running:
pg._get_backend(torch.device(0))._abort()
time.sleep(1)
if self.rank != 1:
import threading
t = threading.Thread(target=abort)
t.start()
with self.assertRaises(RuntimeError):
# First collective triggers initialization via ncclCommInitRank.
torch.distributed.barrier()
running = False
t.join()
@skip_if_lt_x_gpu(2)