This adds a non-blocking mode to queue_pop. This allows for workers to poll if work is ready without blocking the main loop. This is useful for the case where you want to have a GPU have maximum utilization when something only periodically is sent on the queue.
We also expose a `torch.distributed.QueueEmptyError` so users can catch the error and handle it accordingly.
Test plan:
```
pytest test/distributed/test_store.py -k queue -v -s -x
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/151485
Approved by: https://github.com/fduwjj, https://github.com/tianfengfrank
This adds queue operations as described in https://github.com/pytorch/pytorch/issues/150943.
This works by adding two new operations `queue_push` and `queue_pop`. The semantics are designed to be blocking with a timeout. Pushing will always succeed as the queue is infinite size. Popping will first call `wait` until the key is ready and then pop the value from the queue.
This implements queues for only: HashStore, TCPStore w/ libuv. FileStore and the legacy backends are not supported.
`wait` and `check` work for queue operations though queue_push will only wake up the first waiter rather than all of them.
This also has a few cleanups to error types/documentation in related code.
Example trace:
```
[I409 16:51:43.963833529 TCPStoreLibUvBackend.cpp:829] [c10d - trace] validate magic:1015412686 address:[localhost]:55816
[I409 16:51:43.963845838 TCPStoreLibUvBackend.cpp:842] [c10d - trace] ping nonce:2840795 address:[localhost]:55816
[I409 16:51:43.963902914 TCPStoreLibUvBackend.cpp:911] [c10d - trace] add key:init/ val:1 address:[localhost]:55816
[I409 16:51:43.963939389 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:init/ address:[localhost]:55816
[I409 16:51:43.963974842 TCPStoreLibUvBackend.cpp:893] [c10d - trace] get key:init/ address:[localhost]:55816
[I409 16:51:43.964071909 TCPStoreLibUvBackend.cpp:1121] [c10d - trace] queue_push key:/test_prefix/test_queue_support address:[localhost]:55816
[I409 16:51:43.964080221 TCPStoreLibUvBackend.cpp:940] [c10d - trace] check key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816
[I409 16:51:43.964108584 TCPStoreLibUvBackend.cpp:1121] [c10d - trace] queue_push key:/test_prefix/foo address:[localhost]:55816
[I409 16:51:43.964123207 TCPStoreLibUvBackend.cpp:1121] [c10d - trace] queue_push key:/test_prefix/foo address:[localhost]:55816
[I409 16:51:43.964128194 TCPStoreLibUvBackend.cpp:940] [c10d - trace] check key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816
[I409 16:51:43.964156347 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816
[I409 16:51:43.964187493 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816
[I409 16:51:43.964217709 TCPStoreLibUvBackend.cpp:1133] [c10d - trace] queue_pop key:/test_prefix/foo address:[localhost]:55816
[I409 16:51:43.964324300 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816
[I409 16:51:43.964354495 TCPStoreLibUvBackend.cpp:1133] [c10d - trace] queue_pop key:/test_prefix/foo address:[localhost]:55816
[I409 16:51:43.964416299 TCPStoreLibUvBackend.cpp:940] [c10d - trace] check key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816
[I409 16:51:43.964458733 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/non_existant address:[localhost]:55816
[W409 16:51:43.974516585 socket.cpp:460] [c10d] waitForInput: poll for socket SocketImpl(fd=75, addr=[localhost]:55816, remote=[localhost]:46641) returned 0, likely a timeout
[W409 16:51:43.974559169 socket.cpp:485] [c10d] waitForInput: socket SocketImpl(fd=75, addr=[localhost]:55816, remote=[localhost]:46641) timed out after 10ms
[I409 16:51:43.974600451 TCPStoreLibUvBackend.cpp:1101] [c10d - trace] cancel_wait address:[localhost]:55816
```
Test plan:
```
$ pytest test/distributed/test_store.py -k queue -v -s
test/distributed/test_store.py::FileStoreTest::test_queues SKIPPED [0.4351s] (Store does not support queues)
test/distributed/test_store.py::HashStoreTest::test_queues PASSED [0.0009s]
test/distributed/test_store.py::PrefixFileStoreTest::test_queues SKIPPED [0.0006s] (Store does not support queues)
test/distributed/test_store.py::TCPStoreTest::test_queues SKIPPED [0.0012s] (Store does not support queues)
test/distributed/test_store.py::LibUvTCPStoreTest::test_queues PASSED [0.0014s]
test/distributed/test_store.py::PrefixTCPStoreTest::test_queues PASSED [0.0014s]
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/150969
Approved by: https://github.com/XilunWu, https://github.com/fduwjj
This adds a new `clone()` method to Store which will return a new Store instance that can be used from a different thread.
This is intended to better support multiple threads with stores such as when ProcessGroupNCCL needs a store to do error propagation.
Related issue: https://github.com/pytorch/pytorch/issues/150943
Test plan:
```
pytest test/distributed/test_store.py -k PythonStore
pytest test/distributed/test_store.py -k clone
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/150966
Approved by: https://github.com/fduwjj
In c10d PG initialization, we wrap TCPStore with multiple layers of PrefixStore which adds layers of prefix.
One example is:
"default_pg/0//cuda//timeout_dump"
When initialized the default PG, because there is no store passed. We first add the prefix "default_pg" to the TCPStore returned from rendezvous:
bdeaaad70c/torch/distributed/distributed_c10d.py (L1240)
We then add pg_name (aka 0) bdeaaad70c/torch/distributed/distributed_c10d.py (L1376) and device (aka cuda) bdeaaad70c/torch/distributed/distributed_c10d.py (L1387)
to the prefix. Then when we call store_->set("timeout_dump"). The actual key used for writing into TCPStore is "default_pg/0//cuda//timeout_dump".
For sub-PG, things get even interesting, we put the store wrapped with default pg name to a cache:
bdeaaad70c/torch/distributed/distributed_c10d.py (L1517)
And when creating each subPG, it is append its PG name right after the cached store. The example keys are:
'default_pg/0//10//cuda//timeout_dump', 'default_pg/0//12//cuda//timeout_dump', 'default_pg/0//38//cuda//timeout_dump', 'default_pg/0//39//cuda//timeout_dump'. (10, 12, 38 and 39 are all PG names of each subPG created)
The reason why the number in the name is bumped up so high is because for each subPG creation, all ranks have to call the API together and the global variable used for PG name will be bumped up monolithically:
bdeaaad70c/torch/distributed/distributed_c10d.py (L3666)
Similar things happen for using hashing for PG names.
This has a potential issue, because each sub-PG has an instance of ProcessGroupNCCL, and if we want to set something global to notify all sub-PGs (and all ranks). This added prefix causes bugs. For example, if on sub-PG 1, we set a value to TCPStore with key ('default_pg/0//1//cuda//timeout_dump'), while we use the default PG instances to check the TCPStore, which are using the key ('default_pg/0//cuda//timeout_dump'), default PG instances will never get the notified signals. So in this PR, we added a new API in PrefixStore which we get the innermost non-PrefixStore for set and check. The next PR will make changes in NCCL watchdog.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/117074
Approved by: https://github.com/wconstab, https://github.com/H-Huang
This is reland of PRs #https://github.com/pytorch/pytorch/pull/108626 and #109564. We fixed the IOS build failure by changing
```
((CHECK) ? (EXPR) : ([] { assert(!#CHECK); }(), (EXPR)))
```
to
```
((CHECK) ? (EXPR) : ([] { assert(false); }(), (EXPR)))
```
in TR2_OPTIONAL_ASSERTED_EXPRESSION, since the former syntax was invalid on Apple Clang. Anyway, we could apply the simple fix hoping that c10::optional would be replaced by std::optional soon.
We also enabled -Wdeprecated on c10.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/110019
Approved by: https://github.com/clee2000
The feature was never fully finished and never got any adoption but
TCPStore pays the cost of twice the number of tcp connections anyway.
While the cost of all those idle connections is minimal is doesn't come for free:
- It increases the likelyhood of a connection refused failure during the initialization stampede.
- TCPStore uses poll for checking for socket availability which scales linearly on the number of sockets regardless of their status.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/105014
Approved by: https://github.com/fduwjj
Applies so more fixes to headers that may have been missed before for performance optimization.cc @jgong5 @mingfeima @XiaobingSuper @sanchitintel @ashokei @jingxu10 @EikanWang @ezyang since this more in the series of the clang-tidy fixup
This is PR fixes 3 main issues:
1. Use emplacement more in headers
1. Avoid unnecessary copies and use const ref when possible
1. Default any special functions when possible to make them potentially trivial and more readable.
1. There is also one change in this PR that tries to prevent unnecessary math promotion, the rest of these changes are in another PR
Pull Request resolved: https://github.com/pytorch/pytorch/pull/91445
Approved by: https://github.com/ezyang
…c10d
Fixes a broken header filters from #90699 and applies a few more clang-tidy fixes that are relevant from c10 and c10d. The header filter pattern was actually broken and the clang-tidy include pattern was redundant. Also fixed a few bugs in torch/distributed/c10d
Pull Request resolved: https://github.com/pytorch/pytorch/pull/91178
Approved by: https://github.com/ezyang
Headers under torch/csrc/distributed may be referened with relative path, e.g., "<c10d/...>". However, relative path cannot be gracefully handled by Meta internal build when the NCCL PG is hipified to support AMD/RCCL because the "hipified" header files are generated in other directories. Moreover, using absolute path for header inclusion is the state-of-the-art in most components in Pytorch. Thus, this patch refactors all header paths in torch/csrc/distributed to be absolute.
See D39835774 for more details about Meta internal complication.
**How to test**: commit 9e5d199 removes -I./torch/csrc/distributed in compile options. Thus use it to verify we don't miss any relative path use of torch/csrc/distributed headers.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/85780
Approved by: https://github.com/kumpera, https://github.com/huydhn
Headers under torch/csrc/distributed may be referened with relative path, e.g., "<c10d/...>". However, relative path cannot be gracefully handled by Meta internal build when the NCCL PG is hipified to support AMD/RCCL because the "hipified" header files are generated in other directories. Moreover, using absolute path for header inclusion is the state-of-the-art in most components in Pytorch. Thus, this patch refactors all header paths in torch/csrc/distributed to be absolute.
See D39835774 for more details about Meta internal complication.
**How to test**: commit 9e5d199 removes -I./torch/csrc/distributed in compile options. Thus use it to verify we don't miss any relative path use of torch/csrc/distributed headers.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/85780
Approved by: https://github.com/kumpera
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/60543
Since now c10d is part of libtorch, it would also be nice if the sources lived all in one place.
ghstack-source-id: 132306292
Test Plan: It builds
Reviewed By: cbalioglu
Differential Revision: D29062002
fbshipit-source-id: d9e1301e9d73e1643fa0f0119cd2d618f1ad52e6