We introduced the dispatchable backend for a ProcessGroup and collective in https://github.com/pytorch/pytorch/issues/86225. This PR is a follow-up cleanup to clean up the option of a ProcessGroup and ask users to either set timeout or backend later on or directly create backend after creating a PG.
Also PGNCCL is using option class from ProcessGroup but we actually should use Option from backend class. So this PR is to make the type or name to be aligned with what we are doing in cpp side. I don't change the signature for the public API, so they still use args named "pg_options"
We need to make changes to the test to make it aligned with the change.
This is try to reland D62008954 by fixing internal errors.
Differential Revision: [D62483294](https://our.internmc.facebook.com/intern/diff/D62483294/)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/135653
Approved by: https://github.com/wz337, https://github.com/H-Huang
We introduced the dispatchable backend for a ProcessGroup and collective in https://github.com/pytorch/pytorch/issues/86225. This PR is a follow-up cleanup to clean up the option of a ProcessGroup and ask users to either set timeout or backend later on or directly create backend after creating a PG.
Also PGNCCL is using option class from ProcessGroup but we actually should use Option from backend class. So this PR is to make the type or name to be aligned with what we are doing in cpp side. I don't change the signature for the public API, so they still use args named "pg_options"
We need to make changes to the test to make it aligned with the change.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/132931
Approved by: https://github.com/H-Huang
This PR continues to clean clang-tidy warnings in torch/csrc/distributed/c10d, following #124701. In addition, libfmt dependency is added in CMake code to enable using it in the headers. The libfmt has to be added as private dependency to torch_cuda and torch_hip because they include torch/csrc/distributed/c10d/Utils.hpp which uses libfmt.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/124987
Approved by: https://github.com/malfet
Summary:
We need a way to allow user set a customized description for a process group, e.g. FSDP, PP.
Here are several use cases of user specified group_desc:
- Logging: we can easily match a log line and understand what's this collective/pg is used to.
- Pytorch traces (e.g. Kineto, Execution Trace) can benefit from the PG desc since trace analysis, benchmarks will be able to easily differentiate PG purpose like FSDP, PP.
- Lower layer collectives(e.g. NCCL) debug: we will be able to expose PG desc to NCCL communicator so NCCL layer operations can be easily correlated to a PG.
Solution: Add a group_desc field to c10d
Differential Revision: D55781850
Pull Request resolved: https://github.com/pytorch/pytorch/pull/123472
Approved by: https://github.com/kwen2501
Previously we could only use `ncclCommSplit` when we knew all backends were connected on all shards (due to the need to perform a NOCOLOR split), which in practice meant we could only use it for subgroups that were copies of the entire world.
This change allows for specifying a bound device id to `init_process_group` which tells the pg and its backends that the specified device, and the specified device only, will be associated with this rank.
This guarantee lets us do an early connect (which we could not previously do due to how ProcessGroupNCCL infers devices based on tensors and not the rank number). And by doing the early connect, we have the guarantee ranks are connected and can perform nocolor splits when needed.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/114916
Approved by: https://github.com/kwen2501
This PR introduces a native version of c10d_functional ops. The main goal is to add collective support in AOTInductor and allow collective ops to work in multi-threaded native runtimes.
The native version also incorporated API improvements we wished to implement in Python c10d_functional:
- Removed `ranks` and `group_size` from collective op signatures which were proven to be redundant.
- Use tensor storage as opposed to `void*` to resolve in-flight work.
The native process group registration/resolution mechansim is only used for native c10d_functional in the PR. It will become the single source of truth in upcoming PRs.
The upcoming PRs will implement Inductor/AOTInductor support for c10d_functional, after which native c10d_functional will replace Python c10d_functional.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/110570
Approved by: https://github.com/wanchaol
For synchronous ops (i.e. `asyncOp = False`), we don't want to record streams because we know that the NCCL stream will join back to the "current" stream right after this op. So we might just as well keep the stream ownership of the input/output tensors unchanged. The benefit would be that the allocation/free of the tensors would look deterministic to the "current" stream so that the caching allocator can reuse memory pool for this stream in a clever way.
To prevent the input/output tensors from being recycled by python, we rely on the stashing mechanism in ProcessGroupNCCL (which can be also turned on by setting `TORCH_NCCL_AVOID_RECORD_STREAMS=1`).
This mechanism change is for libraries like FSDP which uses `all_gather_into_tensor` and `reduce_scatter_tensor` in a synchronous way and which cannot set `TORCH_NCCL_AVOID_RECORD_STREAMS=1` for their users. And therefore, this change is limited to these two collectives for now.
Cc: @awgu @janeyx99 @albanD
Pull Request resolved: https://github.com/pytorch/pytorch/pull/111431
Approved by: https://github.com/H-Huang
Sequence numbers must be associated with a Work object
if we want to use it as a way to report collective progress.
The API surface change is introducing Work::getSequenceNumber, which
should eventually be exposed to python.
The bulk of this change is changing gloo to make the sequence number
be always in use and weave it to the dozens subclasses of Work.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/109136
Approved by: https://github.com/fduwjj
Collectives timing gates the tracking when a collective starts on device.
Currently it's enabled by set the NCCL_ENABLE_TIMING env var.
The goal of this PR is to make it possible to dynamically enable that flag so users of the PG hooks don't have to set that flag in order to have their hooks work.
The design is that once set, all new collectives will have such behavior so we track it on each Work object.
We make enableTiming_ atomic in PGNCCL to avoid races on non-TSO hardware.
To ensure consistency, we copy its value during Work construction and replace all previous usage of enableTiming_ from the PG with usages from the Work, which now has an immutable value.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/108814
Approved by: https://github.com/wconstab, https://github.com/fduwjj
ghstack dependencies: #108813
This allows infra/trainers to get detailed stats about communication
efficiencies without know anything about what model or distributed
training paradigms have been used. This is helpful as infra/trainer
package usually prefers to be as model/algorithm agnostic as possible.
Therefore, we cannot assume that infra/trainer can have access to all
collectives used by the model authors.
This commit adds an `OnCompletion` hook to `ProcessGroupNCCL` which
will be fired on every work completion event.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/107233
Approved by: https://github.com/kumpera
This allows infra/trainers to get detailed stats about communication
efficiencies without know anything about what model or distributed
training paradigms have been used. This is helpful as infra/trainer
package usually prefers to be as model/algorithm agnostic as possible.
Therefore, we cannot assume that infra/trainer can have access to all
collectives used by the model authors.
This commit adds an `OnCompletion` hook to `ProcessGroupNCCL` which
will be fired on every work completion event.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/106988
Approved by: https://github.com/kumpera, https://github.com/H-Huang
ghstack dependencies: #107140, #107141, #107160
Map of #101157.
This PR adds support for coalesced `reduce_scatter_tensor` calls in the following syntax:
Sync communication style:
```
with dist._coalescing_manager():
for i in range(num_coll):
dist.reduce_scatter_tensor(output_tensors[i], input_tensors[i])
```
Async communication style:
```
with dist._coalescing_manager(async_ops=True) as cm:
for i in range(num_coll):
dist.reduce_scatter_tensor(output_tensors[i], input_tensors[i])
# do a bunch of other things
cm.wait()
# do things that depend on the reduce-scatters' results
```
Each `reduce_scatter_tensor` call can be independent in terms of their data and buffer locations. But could be executed in parallel by supported backends (like NCCL).
Pull Request resolved: https://github.com/pytorch/pytorch/pull/103561
Approved by: https://github.com/fegin
This PR adds support for the following use cases:
- Sync style:
```
with dist._coalescing_manager():
for i in range(num_coll):
dist.all_gather_into_tensor(output_tensors[i], input_tensors[i])
```
- Async style:
```
with dist._coalescing_manager(async_ops=True) as cm:
for i in range(num_coll):
dist.all_gather_into_tensor(output_tensors[i], input_tensors[i])
# do a bunch of other things
cm.wait()
# do things that depend on the all-gather's
```
Each `all_gather_into_tensor` would be independent in terms of data and their buffer location. But could be executed in parallel by supported backends (like NCCL).
Pull Request resolved: https://github.com/pytorch/pytorch/pull/101157
Approved by: https://github.com/kumpera, https://github.com/wanchaol
### Description
The PR aims at reducing CPU overhead of context manager style coalescing.
By "context manager style coalescing", we mean:
Sync style:
```
with _coalescing_manager():
for i in range(num_coll):
dist.all_reduce(tensors[i])
```
Async style:
```
with _coalescing_manager(async_ops=True) as cm:
for i in range(num_coll):
dist.all_reduce(tensors[i])
cm.wait()
```
In previous implementation, each collective in the `num_coll` loop actually calls into the C++ backend, accumulating pybind overhead.
In the new implementation, we capture the collectives at Python level, and only fire towards C++ at the exit of the coalescing manager.
### Tests
In current PR, the "fast path" only applies to all-reduce.
- Flattened 512M: 16.38 ms, including CPU time 131.21 us
- Old _coalescing_manager 64 x 8M: 22.19 ms, including CPU time 2865 us
- New _coalescing_manager 64 x 8M: 16.93 ms, including CPU time 635 us
Hence a 4x reduction in CPU overhead (dependent on `num_coll`).
Cc @mrshenli @kumpera @wanchaol @fegin
Pull Request resolved: https://github.com/pytorch/pytorch/pull/98793
Approved by: https://github.com/kumpera
### Description
The PR aims at reducing CPU overhead of context manager style coalescing.
By "context manager style coalescing", we mean:
Sync style:
```
with _coalescing_manager():
for i in range(num_coll):
dist.all_reduce(tensors[i])
```
Async style:
```
with _coalescing_manager(async_ops=True) as cm:
for i in range(num_coll):
dist.all_reduce(tensors[i])
cm.wait()
```
In previous implementation, each collective in the `num_coll` loop actually calls into the C++ backend, accumulating pybind overhead.
In the new implementation, we capture the collectives at Python level, and only fire towards C++ at the exit of the coalescing manager.
### Tests
In current PR, the "fast path" only applies to all-reduce.
- Flattened 512M: 16.38 ms, including CPU time 131.21 us
- Old _coalescing_manager 64 x 8M: 22.19 ms, including CPU time 2865 us
- New _coalescing_manager 64 x 8M: 16.93 ms, including CPU time 635 us
Hence a 4x reduction in CPU overhead (dependent on `num_coll`).
Cc @mrshenli @kumpera @wanchaol @fegin
Pull Request resolved: https://github.com/pytorch/pytorch/pull/98793
Approved by: https://github.com/kumpera
Apply clang-tidy fixups to prefer member initializer and modernize-pass-by-value. This is a mostly a noop, but it should make a few ctors slighlty more readable and more efficient. Also drops in some missing moves that prevents a lot of unnecessary copying.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/91538
Approved by: https://github.com/ezyang
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/88330
### Implementation
Move backend-specific (NCCL, Gloo, etc) collective implementations to corresponding `Backend` class. Update ProcessGroup to support multiple backends and use dispatcher to calls backends based on tensor device type.
### Changes
#### c++ changes (ProcessGroup files, `Ops.cpp`, `init.cpp`)
- Update pybind definitions for new process group base class and new backend class
- Update pybinded backend class with collective definitions to keep BC with Python PG instances (e.g. `dist.ProcessGroupGloo`, `dist.ProcessGroupNCCL`) which are used in tests
- Switch `ProcessGroupGloo`, `ProcessGroupNCCL`, `ProcessGroupMPI`, `ProcessGroupUCC` to derive from the `Backend` class.
- Update CPU/CUDA `Ops.cpp` and `OpsImpl.cpp` to perform this dispatching by querying the backend using the device type
- Update internal dispatched implementation of `barrier` to use a tensor which allows operation to be dispatched.
- Update `allgather` collective to use `TensorList`. For some reason it was using the default implementation of `allgather` rather than dispatching it correctly. I still don't understand why and had originally filed an issue in 85122.
#### python changes (`distributed_c10d.py`, test files)
- Add BackendConfig class to specify the default configurations of backends and `get_backend_config()` API
- `get_backend()` deprecation warning
- `init_process_group` how returns a generic `ProcessGroup` object, it contains a list of backends (the ones stated above) which it will dispatch operations to.
- `new_group` updated to return the same as above
- Update `test_c10d_gloo.py`, Update `DistributedDataParallelTest` to use `init_process_group`, Update `ReducerTest`, update `test_broadcast_coalesced_gloo` to move from PG instance and gloo options
- Update `test_c10d_nccl.py`, Update `DistributedDataParallelTest` to use `init_process_group`
- Specific tests updated: `test_Backend_enum_class`
### Changes missing
- lazy initialization of backends
- support parsing of BackendConfig
### open questions
- Pure Python PG extensions (https://github.com/pytorch/pytorch/pull/66338)
# Example
This is a basic script (using 2 backends within a process group)
```python
# python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 basic_scenario.py
import torch.distributed as dist
import torch
import os
if __name__ == "__main__":
rank = os.environ.get("RANK")
# initialize with both gloo and nccl
dist.init_process_group()
# with gloo
dist.all_reduce(torch.tensor([1.0]))
print(f"Rank {rank} finished")
# with nccl
dist.all_reduce(torch.tensor([1.0], device=f"cuda:{rank}"))
```
Test Plan: Imported from OSS
Differential Revision: D42069829
Pulled By: H-Huang
Pull Request resolved: https://github.com/pytorch/pytorch/pull/90997
Approved by: https://github.com/awgu, https://github.com/fduwjj
Headers under torch/csrc/distributed may be referened with relative path, e.g., "<c10d/...>". However, relative path cannot be gracefully handled by Meta internal build when the NCCL PG is hipified to support AMD/RCCL because the "hipified" header files are generated in other directories. Moreover, using absolute path for header inclusion is the state-of-the-art in most components in Pytorch. Thus, this patch refactors all header paths in torch/csrc/distributed to be absolute.
See D39835774 for more details about Meta internal complication.
**How to test**: commit 9e5d199 removes -I./torch/csrc/distributed in compile options. Thus use it to verify we don't miss any relative path use of torch/csrc/distributed headers.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/85780
Approved by: https://github.com/kumpera, https://github.com/huydhn
Headers under torch/csrc/distributed may be referened with relative path, e.g., "<c10d/...>". However, relative path cannot be gracefully handled by Meta internal build when the NCCL PG is hipified to support AMD/RCCL because the "hipified" header files are generated in other directories. Moreover, using absolute path for header inclusion is the state-of-the-art in most components in Pytorch. Thus, this patch refactors all header paths in torch/csrc/distributed to be absolute.
See D39835774 for more details about Meta internal complication.
**How to test**: commit 9e5d199 removes -I./torch/csrc/distributed in compile options. Thus use it to verify we don't miss any relative path use of torch/csrc/distributed headers.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/85780
Approved by: https://github.com/kumpera
### Changes
- Move ProcessGroup::Work into its own class and update all the references to it / header includes.
#### Motivation
In the future PRs we will repurpose ProcessGroup to instead contain a list of Backends (ProcessGroupNCCL/Gloo/UCC) and perform dispatching to them based on tensor type. This change is prevent a circular dependency with ProcessGroup depending on Backend and Backend depending on ProcessGroup::Work.
Differential Revision: [D38839212](https://our.internmc.facebook.com/intern/diff/D38839212)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/83680
Approved by: https://github.com/kwen2501
Summary:
`batch_isend_irecv` previously required the use of `torch.cuda.synchronize` to avoid data race conditions. This was because the ncclStreams were recorderd in the returned ncclWork object _before_ a ncclGroupEnd by the `_batch_p2p_manager` was issued. Thus, the `req.wait()` was effectively waiting on nothing, leading to the later operators working on incorrect intermediate data.
This fix:
- keeps track of ncclStreams to wait on, and records them in the work objects after the batch manager issues a ncclGroupEnd
- renames the `_batch_p2p_manager` to `_coalescing_manager` for generality
- removes the explicit check for NCCL backend inside `_batch_p2p_manager` in distributed_c10.py and moves the manager start/end to ProcessGroup.hpp, in order to transparently work with all process groups
Test Plan: Modified the unittest for `batch_isend_irecv` to check that received tensors are the same as expected tensors. Verified that the test fails before the change, and passes after the change.
Differential Revision: D38100789
Pull Request resolved: https://github.com/pytorch/pytorch/pull/82450
Approved by: https://github.com/kwen2501
This PR enables python process group usage with DDP by doing the following:
- Surface PG::Work::getFuture() as overridable()
- Use Work::getFuture() to retrieve values from a PG.
- Add _create_work_from_future python method that creates a Work object that wraps a Future.
To test this changes we use both strategies to run DDP with a python based PG.
The reason for offering two methods is that both have short-comings.
The wrapper method is harder to troubleshoot as there's no visibility of how the future is used.
The subclass method has memory management issues as can be noticed in the test suite by having to keep Work instances alive by storing them in PG fields.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/79176
Approved by: https://github.com/rohan-varma
Summary:
This patch makes broadcast as a custom op such that it's dispatcher
passable. It's one part of the effort to route comm ops to the dispatcher
such that tracing mechanisms that relies on the dispatcher can trace them,
e.g., LazyTensor and AOTAutograd.
Test Plan:
python test/distributed/test_c10d_nccl.py -k test_broadcast_ops
python test/distributed/test_c10d_gloo.py -k test_broadcast_basics
...and other existing distributed tests.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/76722
Approved by: https://github.com/pritamdamania87
Summary:
`batch_isend_irecv` previously only worked for two-rank cases,
otherwise it would hang, e.g. pytorch/pytorch#73960. This Diff extends
`batch_isend_irecv` to support more than two ranks. The fix is by treating
the operation more like a collective rather than two-rank P2P when selecting
the communicator, since there can be more ranks participating in the batch call than "my" rank and "my" peer.
Rules:
- If `batch_isend_irecv` is the first collective call (including collectives and
all-to-all) in the `group` given as the argument, then all ranks of the
`group` are expected to participate in this call.
- Otherwise, if it is not the first collective call in the `group` (i.e. the
communicator has been initialized), then batched P2P communication involving
only subset of processes of the `group` is allowed.
Test Plan:
Added p2p_tests.py testing the following patterns:
+ sendrecv_neighbor(input, output) # Ring like neighbor exchange
+ sendrecv_ripple(input, output) # Exchange with growing distance (pytorch/pytorch#73960)
+ sendrecv_P2P(input, output) # Single P2P operation
+ isendrecv_P2P(input, output) # Single non-blocking P2P operation
+ isendrecv_P2P_batch(input, output, 0) # batched P2P between only two ranks
+ isendrecv_P2P_batch(input, output, 1) # batched P2P within a new group created for two ranks
Differential Revision: D35122664
Pull Request resolved: https://github.com/pytorch/pytorch/pull/74701
Approved by: https://github.com/mingzhe09088, https://github.com/osalpekar
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/73702
This PR adds logging to track usage of ProcessGroup backends. The
change involves adding an `init()` method to the ProcessGroup base class and
modifying all the subclasses to call it.
We can't add logging directly in the ProcessGroup constructor since
`getBackendName()` is pure virtual and can't be called from the base class
constructor. See
https://isocpp.org/wiki/faq/strange-inheritance#calling-virtuals-from-ctors for
more details.
ghstack-source-id: 150381852
Test Plan: check logging
Reviewed By: cbalioglu
Differential Revision: D34596295
fbshipit-source-id: 5baa7bfb53bdcd1b4032292c4e7715467f983288
(cherry picked from commit b2344144fe0c25db02c2e958f2acd772e9cd4dc8)