[c10d] ProcessGroupGloo: support per operation timeouts (#158128)

This updates ProcessGroupGloo to support per operation timeouts. Previously the timeouts were ignored even if they were set.

* This checks if the timeout is `kUnsetTimeout` and conditionally uses the provided timeout or the default timeout from the context.
* This exposes `set_timeout` as a standard method on ProcessGroup/Backend so we can test the global timeout.

Test plan:

```
pytest test/distributed/test_c10d_gloo.py -v -k allreduce_timeout
```

Pull Request resolved: https://github.com/pytorch/pytorch/pull/158128
Approved by: https://github.com/H-Huang, https://github.com/fduwjj
This commit is contained in:
Tristan Rice
2025-07-11 23:09:46 +00:00
committed by PyTorch MergeBot
parent a8ec7babcf
commit 2a8795a981
10 changed files with 241 additions and 66 deletions

View File

@ -2423,6 +2423,12 @@ communication mechanism.
then all leave the call together.
See :func:`torch.distributed.monitored_barrier` for more details.)")
.def(
"set_timeout",
&::c10d::ProcessGroup::setTimeout,
py::arg("timeout"),
py::call_guard<py::gil_scoped_release>(),
R"(Sets the default timeout for all future operations.)")
.def_property_readonly(
"_device_types", &::c10d::ProcessGroup::getDeviceTypes)
.def(
@ -2616,6 +2622,12 @@ Arguments:
"supports_time_estimate",
&::c10d::Backend::supportsTimeEstimation,
"(test whether the backend supports collective time estimation)")
.def(
"set_timeout",
&::c10d::Backend::setTimeout,
py::arg("timeout"),
py::call_guard<py::gil_scoped_release>(),
R"(Sets the default timeout for all future operations.)")
.def(
"broadcast",
&::c10d::Backend::broadcast,
@ -3068,10 +3080,7 @@ options :class:`~torch.distributed.ProcessGroupNCCL.Options`).
R"(Create a new ProcessGroupGloo instance.)")
.def(
"_set_default_timeout",
[](const c10::intrusive_ptr<::c10d::ProcessGroupGloo>& self,
std::chrono::milliseconds timeout) {
self->getOptions()->timeout = timeout;
},
&::c10d::ProcessGroupGloo::setTimeout,
py::arg("timeout"),
py::call_guard<py::gil_scoped_release>())
.def_property_readonly(
@ -3168,10 +3177,7 @@ options :class:`~torch.distributed.ProcessGroupNCCL.Options`).
&::c10d::ProcessGroupNCCL::getCommSplitCounter)
.def(
"_set_default_timeout",
[](const c10::intrusive_ptr<::c10d::ProcessGroupNCCL>& self,
std::chrono::milliseconds timeout) {
self->getOptions()->timeout = timeout;
},
&::c10d::ProcessGroupNCCL::setTimeout,
py::arg("timeout"),
py::call_guard<py::gil_scoped_release>())
.def(