Add networkx as a dependency for test_bazel
Example failure: https://github.com/pytorch/pytorch/actions/runs/12551752021/job/34996706301
```
INFO: From Testing //:test_bazel:
==================== Test output for //:test_bazel:
Traceback (most recent call last):
File "/var/lib/jenkins/.cache/bazel/_bazel_jenkins/fdf6d09bf4b4f04a71e2a7dfceb40620/sandbox/processwrapper-sandbox/6504/execroot/pytorch/bazel-out/k8-fastbuild/bin/test_bazel.runfiles/pytorch/test/_test_bazel.py", line 33, in <module>
test_simple_compile_eager()
File "/var/lib/jenkins/.cache/bazel/_bazel_jenkins/fdf6d09bf4b4f04a71e2a7dfceb40620/sandbox/processwrapper-sandbox/6504/execroot/pytorch/bazel-out/k8-fastbuild/bin/test_bazel.runfiles/pytorch/test/_test_bazel.py", line 27, in test_simple_compile_eager
opt_foo1 = torch.compile(foo, backend="eager")
File "/var/lib/jenkins/.cache/bazel/_bazel_jenkins/fdf6d09bf4b4f04a71e2a7dfceb40620/sandbox/processwrapper-sandbox/6504/execroot/pytorch/bazel-out/k8-fastbuild/bin/test_bazel.runfiles/pytorch/torch/__init__.py", line 2533, in compile
backend = _TorchCompileWrapper(backend, mode, options, dynamic)
File "/var/lib/jenkins/.cache/bazel/_bazel_jenkins/fdf6d09bf4b4f04a71e2a7dfceb40620/sandbox/processwrapper-sandbox/6504/execroot/pytorch/bazel-out/k8-fastbuild/bin/test_bazel.runfiles/pytorch/torch/__init__.py", line 2342, in __init__
self.compiler_fn = lookup_backend(backend)
File "/var/lib/jenkins/.cache/bazel/_bazel_jenkins/fdf6d09bf4b4f04a71e2a7dfceb40620/sandbox/processwrapper-sandbox/6504/execroot/pytorch/bazel-out/k8-fastbuild/bin/test_bazel.runfiles/pytorch/torch/_dynamo/backends/registry.py", line 66, in lookup_backend
_lazy_import()
File "/var/lib/jenkins/.cache/bazel/_bazel_jenkins/fdf6d09bf4b4f04a71e2a7dfceb40620/sandbox/processwrapper-sandbox/6504/execroot/pytorch/bazel-out/k8-fastbuild/bin/test_bazel.runfiles/pytorch/torch/_dynamo/backends/registry.py", line 102, in _lazy_import
import_submodule(backends)
File "/var/lib/jenkins/.cache/bazel/_bazel_jenkins/fdf6d09bf4b4f04a71e2a7dfceb40620/sandbox/processwrapper-sandbox/6504/execroot/pytorch/bazel-out/k8-fastbuild/bin/test_bazel.runfiles/pytorch/torch/_dynamo/utils.py", line 2797, in import_submodule
importlib.import_module(f"{mod.__name__}.{filename[:-3]}")
File "/var/lib/jenkins/.cache/bazel/_bazel_jenkins/fdf6d09bf4b4f04a71e2a7dfceb40620/execroot/pytorch/external/python3_10_x86_64-unknown-linux-gnu/lib/python3.10/importlib/__init__.py", line 126, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
File "<frozen importlib._bootstrap>", line 1006, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 688, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 883, in exec_module
File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
File "/var/lib/jenkins/.cache/bazel/_bazel_jenkins/fdf6d09bf4b4f04a71e2a7dfceb40620/sandbox/processwrapper-sandbox/6504/execroot/pytorch/bazel-out/k8-fastbuild/bin/test_bazel.runfiles/pytorch/torch/_dynamo/backends/common.py", line 12, in <module>
from torch._functorch.aot_autograd import (
File "/var/lib/jenkins/.cache/bazel/_bazel_jenkins/fdf6d09bf4b4f04a71e2a7dfceb40620/sandbox/processwrapper-sandbox/6504/execroot/pytorch/bazel-out/k8-fastbuild/bin/test_bazel.runfiles/pytorch/torch/_functorch/aot_autograd.py", line 147, in <module>
from .partitioners import default_partition
File "/var/lib/jenkins/.cache/bazel/_bazel_jenkins/fdf6d09bf4b4f04a71e2a7dfceb40620/sandbox/processwrapper-sandbox/6504/execroot/pytorch/bazel-out/k8-fastbuild/bin/test_bazel.runfiles/pytorch/torch/_functorch/partitioners.py", line 31, in <module>
from ._activation_checkpointing.graph_info_provider import GraphInfoProvider
File "/var/lib/jenkins/.cache/bazel/_bazel_jenkins/fdf6d09bf4b4f04a71e2a7dfceb40620/sandbox/processwrapper-sandbox/6504/execroot/pytorch/bazel-out/k8-fastbuild/bin/test_bazel.runfiles/pytorch/torch/_functorch/_activation_checkpointing/graph_info_provider.py", line 3, in <module>
import networkx as nx
ModuleNotFoundError: No module named 'networkx'
```
No periodic runs on this PR or its main branch commit, but I'm pretty sure its started on https://togithub.com/pytorch/pytorch/pull/143539
Pull Request resolved: https://github.com/pytorch/pytorch/pull/143995
Approved by: https://github.com/huydhn
Description:
1. Quantize Linear Layer Weights to 4-bits:
Quantize the weights of the Linear layer to 4 bits, using symmetric quantization.
Pack two 4-bit weights into one uint8 container.
Choose a quantization scheme (channel-wise or group-wise), with the group size being a multiple of 32.
2. Prepare Quantized Weights, Scales, and Optional Bias:
After quantizing, obtain the quantized_weights, scales, and groupsize.
If the original Linear layer has a bias, prepare it as well.
3. Pack the Weights Efficiently:
Use torch.ops.aten._dyn_quant_pack_4bit_weight to optimally pack the weights, scales, and optional bias.
```python
packed_weights = torch.ops.aten._dyn_quant_pack_4bit_weight(weight, scales_and_zeros, bias, groupsize, in_features, out_features)
```
Input parameters should include:
in_features and out_features (the same as the Linear layer’s corresponding parameters).
4. Perform Dynamic Quantized Matrix Multiplication:
Use torch.ops.aten._dyn_quant_matmul_4bit to perform matrix multiplication with quantized weights.
```python
output = torch.ops.aten._dyn_quant_matmul_4bit(input, packed_weights, groupsize, in_features, out_features)
```
Inputs required include:
The input tensor, packed_weights , groupsize, and the in_features and out_features.
API Usage: https://github.com/pytorch/pytorch/issues/143289
Model Perf :
7B Transformer model:
Prefill : 340 t/s
Decode : 40 t/s
2B Transformer model
Prefill : 747 t/s
Decode : 80 t/s
Tests:
python test/test_linalg.py -k test__dyn_quant_pack_4bit_weight
Ran 1 test in 0.016s
OK
python test/test_linalg.py -k test__dyn_quant_matmul_4bit
Ran 8 tests in 0.077s
OK
python test/test_linalg.py -k test_compile_dyn_quant_matmul_4bit
Ran 8 tests in 11.454s
Change-Id: Ia1672bad5e6ec94e64d8bb1971395d60f4b3a452
Fixes #ISSUE_NUMBER
Pull Request resolved: https://github.com/pytorch/pytorch/pull/134124
Approved by: https://github.com/digantdesai, https://github.com/malfet
Description:
1. Quantize Linear Layer Weights to 4-bits:
Quantize the weights of the Linear layer to 4 bits, using symmetric quantization.
Pack two 4-bit weights into one uint8 container.
Choose a quantization scheme (channel-wise or group-wise), with the group size being a multiple of 32.
2. Prepare Quantized Weights, Scales, and Optional Bias:
After quantizing, obtain the quantized_weights, scales, and groupsize.
If the original Linear layer has a bias, prepare it as well.
3. Pack the Weights Efficiently:
Use torch.ops.aten._dyn_quant_pack_4bit_weight to optimally pack the weights, scales, and optional bias.
```python
packed_weights = torch.ops.aten._dyn_quant_pack_4bit_weight(weight, scales_and_zeros, bias, groupsize, in_features, out_features)
```
Input parameters should include:
in_features and out_features (the same as the Linear layer’s corresponding parameters).
4. Perform Dynamic Quantized Matrix Multiplication:
Use torch.ops.aten._dyn_quant_matmul_4bit to perform matrix multiplication with quantized weights.
```python
output = torch.ops.aten._dyn_quant_matmul_4bit(input, packed_weights, groupsize, in_features, out_features)
```
Inputs required include:
The input tensor, packed_weights , groupsize, and the in_features and out_features.
API Usage: https://github.com/pytorch/pytorch/issues/143289
Model Perf :
7B Transformer model:
Prefill : 340 t/s
Decode : 40 t/s
2B Transformer model
Prefill : 747 t/s
Decode : 80 t/s
Tests:
python test/test_linalg.py -k test__dyn_quant_pack_4bit_weight
Ran 1 test in 0.016s
OK
python test/test_linalg.py -k test__dyn_quant_matmul_4bit
Ran 8 tests in 0.077s
OK
python test/test_linalg.py -k test_compile_dyn_quant_matmul_4bit
Ran 8 tests in 11.454s
Change-Id: Ia1672bad5e6ec94e64d8bb1971395d60f4b3a452
Fixes #ISSUE_NUMBER
Pull Request resolved: https://github.com/pytorch/pytorch/pull/134124
Approved by: https://github.com/digantdesai, https://github.com/malfet
Description:
1. Quantize Linear Layer Weights to 4-bits:
Quantize the weights of the Linear layer to 4 bits, using symmetric quantization.
Pack two 4-bit weights into one uint8 container.
Choose a quantization scheme (channel-wise or group-wise), with the group size being a multiple of 32.
2. Prepare Quantized Weights, Scales, and Optional Bias:
After quantizing, obtain the quantized_weights, scales, and groupsize.
If the original Linear layer has a bias, prepare it as well.
3. Pack the Weights Efficiently:
Use torch.ops.aten._dyn_quant_pack_4bit_weight to optimally pack the weights, scales, and optional bias.
```python
packed_weights = torch.ops.aten._dyn_quant_pack_4bit_weight(weight, scales_and_zeros, bias, groupsize, in_features, out_features)
```
Input parameters should include:
in_features and out_features (the same as the Linear layer’s corresponding parameters).
4. Perform Dynamic Quantized Matrix Multiplication:
Use torch.ops.aten._dyn_quant_matmul_4bit to perform matrix multiplication with quantized weights.
```python
output = torch.ops.aten._dyn_quant_matmul_4bit(input, packed_weights, groupsize, in_features, out_features)
```
Inputs required include:
The input tensor, packed_weights , groupsize, and the in_features and out_features.
API Usage: https://github.com/pytorch/pytorch/issues/143289
Model Perf :
7B Transformer model:
Prefill : 340 t/s
Decode : 40 t/s
2B Transformer model
Prefill : 747 t/s
Decode : 80 t/s
Tests:
python test/test_linalg.py -k test__dyn_quant_pack_4bit_weight
Ran 1 test in 0.016s
OK
python test/test_linalg.py -k test__dyn_quant_matmul_4bit
Ran 8 tests in 0.077s
OK
python test/test_linalg.py -k test_compile_dyn_quant_matmul_4bit
Ran 8 tests in 11.454s
Change-Id: Ia1672bad5e6ec94e64d8bb1971395d60f4b3a452
Fixes #ISSUE_NUMBER
Pull Request resolved: https://github.com/pytorch/pytorch/pull/134124
Approved by: https://github.com/digantdesai, https://github.com/malfet
This PR introduces the following:
### torch.ops.symm_mem._async_input_mm
`_async_input_mm(Tensor a, Tensor b, Tensor a_chunk_signals, int a_chunk_pivot) -> Tensor`
An mm impl that supports consuming asynchronous input. It guarantees the following rasterization order, and that the corresponding signal arrives before an input chunk is consumed.
```
num_chunks = a_chunks_signals.numel()
for chunk_idx in range(a_chunk_pivot, num_chunks + a_chunk_pivot):
chunk_idx = chunk_idx % num_chunks
wait_signal(a_chunk_signals, chunk_idx)
# Compute output tiles that consumes the input chunk
```
### PersistentAsyncInputScheduler
This is a forked version of PersistentScheduler that supports consuming asynchronous input. This tile scheduler introduces the following arguments:
- `tiles_per_chunk_m` – Specifies the size of an M chunk. Chunks are the granularity at which the asynchronous input becomes ready. It must be an interger multiple of the size of an M tile.
- `chunk_signals` – `chunk_signals[i] == 1` indicates that chunk i is ready. Before returning a work tile, get_current_work() waits for the signal to ensure that the corresponding chunk is ready.
- `tile_idx_pivot_m` – After applying swizzling, apply `pivot(m) => (m + tile_idx_pivot_m) % tiles_m` to `m`. In a distributed setting, this allows different ranks to process different m indices at the same time, thus avoiding communication hotspots.
Note that this scheduler currently only supports the `KernelTmaWarpSpecializedCooperative` kernel schedule. This is enforced via the template argument `KernelSchedule`.
Usage:
```
using GemmKernel = cutlass::gemm::kernel::GemmUniversal<
Shape<int, int, int, int>,
CollectiveMainloop,
CollectiveEpilogue,
cutlass::gemm::PersistentAsyncInputScheduler<KernelSchedule>>;
```
### _fused_all_gather_matmul_native
An ag-mm impl that combines `torch.ops.symm_mem._async_input_mm` and progress-aware all-gather. This is not yet enabled via the async-tp passes. We will use it as a backend to optimize the current decomposition-based async-tp impl.
## Benchmarks
### 4096x3584x8192
- cublas + nccl: 539us
- decomp-based async-tp w/o cuda graph: 694us
- decomp-based async-tp w/ cuda graph: 478us
- new cutlass kernel: 408us
<img width="478" alt="image" src="https://github.com/user-attachments/assets/39f316ab-36c5-4b41-af77-07854a385dfc">
### 2048x3584x8192
- cublas + nccl: 301us
- decomp-based async-tp w/o cuda graph: 687us
- decomp-based async-tp w/ cuda graph: 356us
- new cutlass kernel: 276us
<img width="441" alt="image" src="https://github.com/user-attachments/assets/9e23ce21-863b-43dd-a562-fb05d3a5a144">
## Next Steps
- Add tuning logic
- Use `_fused_all_gather_matmul_native` as a backend for the decomp-based async-tp impl
Differential temp Revision: [D65623152](https://our.internmc.facebook.com/intern/diff/D65623152)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/139227
Approved by: https://github.com/weifengpy, https://github.com/Chillee
This PR introduces the following:
### torch.ops.symm_mem._async_input_mm
`_async_input_mm(Tensor a, Tensor b, Tensor a_chunk_signals, int a_chunk_pivot) -> Tensor`
An mm impl that supports consuming asynchronous input. It guarantees the following rasterization order, and that the corresponding signal arrives before an input chunk is consumed.
```
num_chunks = a_chunks_signals.numel()
for chunk_idx in range(a_chunk_pivot, num_chunks + a_chunk_pivot):
chunk_idx = chunk_idx % num_chunks
wait_signal(a_chunk_signals, chunk_idx)
# Compute output tiles that consumes the input chunk
```
### PersistentAsyncInputScheduler
This is a forked version of PersistentScheduler that supports consuming asynchronous input. This tile scheduler introduces the following arguments:
- `tiles_per_chunk_m` – Specifies the size of an M chunk. Chunks are the granularity at which the asynchronous input becomes ready. It must be an interger multiple of the size of an M tile.
- `chunk_signals` – `chunk_signals[i] == 1` indicates that chunk i is ready. Before returning a work tile, get_current_work() waits for the signal to ensure that the corresponding chunk is ready.
- `tile_idx_pivot_m` – After applying swizzling, apply `pivot(m) => (m + tile_idx_pivot_m) % tiles_m` to `m`. In a distributed setting, this allows different ranks to process different m indices at the same time, thus avoiding communication hotspots.
Note that this scheduler currently only supports the `KernelTmaWarpSpecializedCooperative` kernel schedule. This is enforced via the template argument `KernelSchedule`.
Usage:
```
using GemmKernel = cutlass::gemm::kernel::GemmUniversal<
Shape<int, int, int, int>,
CollectiveMainloop,
CollectiveEpilogue,
cutlass::gemm::PersistentAsyncInputScheduler<KernelSchedule>>;
```
### _fused_all_gather_matmul_native
An ag-mm impl that combines `torch.ops.symm_mem._async_input_mm` and progress-aware all-gather. This is not yet enabled via the async-tp passes. We will use it as a backend to optimize the current decomposition-based async-tp impl.
## Benchmarks
### 4096x3584x8192
- cublas + nccl: 539us
- decomp-based async-tp w/o cuda graph: 694us
- decomp-based async-tp w/ cuda graph: 478us
- new cutlass kernel: 408us
<img width="478" alt="image" src="https://github.com/user-attachments/assets/39f316ab-36c5-4b41-af77-07854a385dfc">
### 2048x3584x8192
- cublas + nccl: 301us
- decomp-based async-tp w/o cuda graph: 687us
- decomp-based async-tp w/ cuda graph: 356us
- new cutlass kernel: 276us
<img width="441" alt="image" src="https://github.com/user-attachments/assets/9e23ce21-863b-43dd-a562-fb05d3a5a144">
## Next Steps
- Add tuning logic
- Use `_fused_all_gather_matmul_native` as a backend for the decomp-based async-tp impl
Pull Request resolved: https://github.com/pytorch/pytorch/pull/139227
Approved by: https://github.com/weifengpy, https://github.com/Chillee
So that the tensor's lifetime management is the same as the management built for the NCCL, pre and post kernels.
Also so that on visualizers, they show up in the NCCL stream line. Otherwise if they show up in the compute line, user may get confused (my code does not have these kernels).
The check is thus moved after the point where we depend NCCL stream from the last compute kernel.
Also moved declaration of `checkForNan` from Utils.hpp to NCCLUtils.hpp, and renamed Utils.cu to NCCLUtils.cu.
Differential Revision: [D61957573](https://our.internmc.facebook.com/intern/diff/D61957573)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/134300
Approved by: https://github.com/shuqiangzhang, https://github.com/wconstab
So that the tensor's lifetime management is the same as the management built for the NCCL, pre and post kernels.
Also so that on visualizers, they show up in the NCCL stream line. Otherwise if they show up in the compute line, user may get confused (my code does not have these kernels).
The check is thus moved after the point where we depend NCCL stream from the last compute kernel.
Also moved declaration of `checkForNan` from Utils.hpp to NCCLUtils.hpp, and renamed Utils.cu to NCCLUtils.cu.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/134300
Approved by: https://github.com/shuqiangzhang, https://github.com/wconstab
Summary:
Expose nlohmann json library so that it can be used from inside Pytorch. The library already exists in the `third_party` directory. This PR is making `nlohmann/json.hpp` header available to be used from `torch.distributed`.
The next PR makes actual use of this header.
imported-using-ghimport
Test Plan: Imported from OSS
Reviewed By: malfet
Differential Revision: D59035246
Pulled By: c-p-i-o
Pull Request resolved: https://github.com/pytorch/pytorch/pull/129570
Approved by: https://github.com/d4l3k, https://github.com/malfet
Stack from [ghstack](https://github.com/ezyang/ghstack) (oldest at bottom):
This PR introduces a prototype for `SymmetricMemory` (including a CUDA implementation) - a remote-memory access-based communication primitive. It allows for user-defined communication patterns/kernels and is designed to be torch.compile-friendly. It addresses the major limitations of `IntraNodeComm` and `ProcessGroupCudaP2p` and serves as a replacement for them.
### SymmetricMemory
`SymmetricMemory` represents symmetric allocations across a group of devices. The allocations represented by a `SymmetricMemory` object are accessible by all devices in the group. The class can be used for **op-level custom communication patterns** (via the get_buffer APIs and the synchronization primitives), as well as **custom communication kernels** (via the buffer and signal_pad device pointers).
### Python API Example
```python
from torch._C.distributed_c10d import _SymmetricMemory
# Set a store for rendezvousing symmetric allocations on a group of devices
# identified by group_name. The concept of groups is logical; users can
# utilize predefined groups (e.g., a group of device identified by a
# ProcessGroup) or create custom ones. Note that a SymmetricMemoryAllocator
# backends might employ a more efficient communication channel for the actual
# rendezvous process and only use the store for bootstrapping purposes.
_SymmetricMemory.set_group_info(group_name, rank, world_size, store)
# Identical to empty_strided, but allows symmetric memory access to be
# established for the allocated tensor via _SymmetricMemory.rendezvous().
# This function itself is not a collective operation.
t = _SymmetricMemory.empty_strided_p2p((64, 64), (64, 1), torch.float32, group_name)
# Users can write Python custom ops that leverages the symmetric memory access.
# Below are examples of things users can do (assuming the group's world_size is 2).
# Establishes symmetric memory access on tensors allocated via
# _SymmetricMemory.empty_strided_p2p(). rendezvous() is a one-time process,
# and the mapping between a local memory region and the associated SymmetricMemory
# object is unique. Subsequent calls to rendezvous() with the same tensor will receive
# the cached SymmetricMemory object.
#
# The function has a collective semantic and must be invoked simultaneously
# from all rendezvous participants.
symm_mem = _SymmetricMemory.rendezvous(t)
# This represents the allocation on rank 0 and is accessible from all devices.
buf = symm_mem.get_buffer(0, (64, 64), torch.float32)
if symm_mem.rank == 0:
symm_mem.wait_signal(src_rank=1)
assert buf.eq(42).all()
else:
# The remote buffer can be used as a regular tensor
buf.fill_(42)
symm_mem.put_signal(dst_rank=0)
symm_mem.barrier()
if symm_mem.rank == 0:
symm_mem.barrier()
assert buf.eq(43).all()
else:
new_val = torch.empty_like(buf)
new_val.fill_(43)
# Contiguous copies to/from a remote buffer utilize copy engines
# which bypasses SMs (i.e. no need to load the data into registers)
buf.copy_(new_val)
symm_mem.barrier()
```
### Custom CUDA Comm Kernels
Given a tensor, users can access the associated `SymmetricMemory` which provides pointer to remote buffers/signal_pads needed for custom communication kernels.
```cpp
TORCH_API c10::intrusive_ptr<SymmetricMemory> get_symmetric_memory(
const at::Tensor& tensor);
class TORCH_API SymmetricMemory : public c10::intrusive_ptr_target {
public:
...
virtual std::vector<void*> get_buffer_ptrs() = 0;
virtual std::vector<void*> get_signal_pad_ptrs() = 0;
virtual void** get_buffer_ptrs_dev() = 0;
virtual void** get_signal_pad_ptrs_dev() = 0;
virtual size_t get_buffer_size() = 0;
virtual size_t get_signal_pad_size() = 0;
virtual int get_rank() = 0;
virtual int get_world_size() = 0;
...
};
```
### Limitations of IntraNodeComm and ProcessGroupCudaP2p
Both `IntraNodeComm` (used by `ProcessGroupCudaP2p`) manages a single fixed-size workspace. This approach:
- Leads to awkward UX in which the required workspace needs to be specified upfront.
- Can not avoid extra copies for some algorithms in eager mode (e.g., custom/multimem all-reduce, reduce-scatter, all-gather).
- Prevents torch.compile from eliminating all copies.
In addition, they only offer out-of-the-box communication kernels and don't expose required pointers for user-defined, custom CUDA comm kernels.
* __->__ #128582
Differential Revision: [D58849033](https://our.internmc.facebook.com/intern/diff/D58849033)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/128582
Approved by: https://github.com/wanchaol
Stack from [ghstack](https://github.com/ezyang/ghstack) (oldest at bottom):
This PR introduces a prototype for `SymmetricMemory` (including a CUDA implementation) - a remote-memory access-based communication primitive. It allows for user-defined communication patterns/kernels and is designed to be torch.compile-friendly. It addresses the major limitations of `IntraNodeComm` and `ProcessGroupCudaP2p` and serves as a replacement for them.
### SymmetricMemory
`SymmetricMemory` represents symmetric allocations across a group of devices. The allocations represented by a `SymmetricMemory` object are accessible by all devices in the group. The class can be used for **op-level custom communication patterns** (via the get_buffer APIs and the synchronization primitives), as well as **custom communication kernels** (via the buffer and signal_pad device pointers).
### Python API Example
```python
from torch._C.distributed_c10d import _SymmetricMemory
# Set a store for rendezvousing symmetric allocations on a group of devices
# identified by group_name. The concept of groups is logical; users can
# utilize predefined groups (e.g., a group of device identified by a
# ProcessGroup) or create custom ones. Note that a SymmetricMemoryAllocator
# backends might employ a more efficient communication channel for the actual
# rendezvous process and only use the store for bootstrapping purposes.
_SymmetricMemory.set_group_info(group_name, rank, world_size, store)
# Identical to empty_strided, but allows symmetric memory access to be
# established for the allocated tensor via _SymmetricMemory.rendezvous().
# This function itself is not a collective operation.
t = _SymmetricMemory.empty_strided_p2p((64, 64), (64, 1), torch.float32, group_name)
# Users can write Python custom ops that leverages the symmetric memory access.
# Below are examples of things users can do (assuming the group's world_size is 2).
# Establishes symmetric memory access on tensors allocated via
# _SymmetricMemory.empty_strided_p2p(). rendezvous() is a one-time process,
# and the mapping between a local memory region and the associated SymmetricMemory
# object is unique. Subsequent calls to rendezvous() with the same tensor will receive
# the cached SymmetricMemory object.
#
# The function has a collective semantic and must be invoked simultaneously
# from all rendezvous participants.
symm_mem = _SymmetricMemory.rendezvous(t)
# This represents the allocation on rank 0 and is accessible from all devices.
buf = symm_mem.get_buffer(0, (64, 64), torch.float32)
if symm_mem.rank == 0:
symm_mem.wait_signal(src_rank=1)
assert buf.eq(42).all()
else:
# The remote buffer can be used as a regular tensor
buf.fill_(42)
symm_mem.put_signal(dst_rank=0)
symm_mem.barrier()
if symm_mem.rank == 0:
symm_mem.barrier()
assert buf.eq(43).all()
else:
new_val = torch.empty_like(buf)
new_val.fill_(43)
# Contiguous copies to/from a remote buffer utilize copy engines
# which bypasses SMs (i.e. no need to load the data into registers)
buf.copy_(new_val)
symm_mem.barrier()
```
### Custom CUDA Comm Kernels
Given a tensor, users can access the associated `SymmetricMemory` which provides pointer to remote buffers/signal_pads needed for custom communication kernels.
```cpp
TORCH_API c10::intrusive_ptr<SymmetricMemory> get_symmetric_memory(
const at::Tensor& tensor);
class TORCH_API SymmetricMemory : public c10::intrusive_ptr_target {
public:
...
virtual std::vector<void*> get_buffer_ptrs() = 0;
virtual std::vector<void*> get_signal_pad_ptrs() = 0;
virtual void** get_buffer_ptrs_dev() = 0;
virtual void** get_signal_pad_ptrs_dev() = 0;
virtual size_t get_buffer_size() = 0;
virtual size_t get_signal_pad_size() = 0;
virtual int get_rank() = 0;
virtual int get_world_size() = 0;
...
};
```
### Limitations of IntraNodeComm and ProcessGroupCudaP2p
Both `IntraNodeComm` (used by `ProcessGroupCudaP2p`) manages a single fixed-size workspace. This approach:
- Leads to awkward UX in which the required workspace needs to be specified upfront.
- Can not avoid extra copies for some algorithms in eager mode (e.g., custom/multimem all-reduce, reduce-scatter, all-gather).
- Prevents torch.compile from eliminating all copies.
In addition, they only offer out-of-the-box communication kernels and don't expose required pointers for user-defined, custom CUDA comm kernels.
* __->__ #128582
Pull Request resolved: https://github.com/pytorch/pytorch/pull/128582
Approved by: https://github.com/wanchaol
Stack from [ghstack](https://github.com/ezyang/ghstack) (oldest at bottom):
This PR introduces a prototype for `SymmetricMemory` (including a CUDA implementation) - a remote-memory access-based communication primitive. It allows for user-defined communication patterns/kernels and is designed to be torch.compile-friendly. It addresses the major limitations of `IntraNodeComm` and `ProcessGroupCudaP2p` and serves as a replacement for them.
### SymmetricMemory
`SymmetricMemory` represents symmetric allocations across a group of devices. The allocations represented by a `SymmetricMemory` object are accessible by all devices in the group. The class can be used for **op-level custom communication patterns** (via the get_buffer APIs and the synchronization primitives), as well as **custom communication kernels** (via the buffer and signal_pad device pointers).
### Python API Example
```python
from torch._C.distributed_c10d import _SymmetricMemory
# Set a store for rendezvousing symmetric allocations on a group of devices
# identified by group_name. The concept of groups is logical; users can
# utilize predefined groups (e.g., a group of device identified by a
# ProcessGroup) or create custom ones. Note that a SymmetricMemoryAllocator
# backends might employ a more efficient communication channel for the actual
# rendezvous process and only use the store for bootstrapping purposes.
_SymmetricMemory.set_group_info(group_name, rank, world_size, store)
# Identical to empty_strided, but allows symmetric memory access to be
# established for the allocated tensor via _SymmetricMemory.rendezvous().
# This function itself is not a collective operation.
t = _SymmetricMemory.empty_strided_p2p((64, 64), (64, 1), torch.float32, group_name)
# Users can write Python custom ops that leverages the symmetric memory access.
# Below are examples of things users can do (assuming the group's world_size is 2).
# Establishes symmetric memory access on tensors allocated via
# _SymmetricMemory.empty_strided_p2p(). rendezvous() is a one-time process,
# and the mapping between a local memory region and the associated SymmetricMemory
# object is unique. Subsequent calls to rendezvous() with the same tensor will receive
# the cached SymmetricMemory object.
#
# The function has a collective semantic and must be invoked simultaneously
# from all rendezvous participants.
symm_mem = _SymmetricMemory.rendezvous(t)
# This represents the allocation on rank 0 and is accessible from all devices.
buf = symm_mem.get_buffer(0, (64, 64), torch.float32)
if symm_mem.rank == 0:
symm_mem.wait_signal(src_rank=1)
assert buf.eq(42).all()
else:
# The remote buffer can be used as a regular tensor
buf.fill_(42)
symm_mem.put_signal(dst_rank=0)
symm_mem.barrier()
if symm_mem.rank == 0:
symm_mem.barrier()
assert buf.eq(43).all()
else:
new_val = torch.empty_like(buf)
new_val.fill_(43)
# Contiguous copies to/from a remote buffer utilize copy engines
# which bypasses SMs (i.e. no need to load the data into registers)
buf.copy_(new_val)
symm_mem.barrier()
```
### Custom CUDA Comm Kernels
Given a tensor, users can access the associated `SymmetricMemory` which provides pointer to remote buffers/signal_pads needed for custom communication kernels.
```cpp
TORCH_API c10::intrusive_ptr<SymmetricMemory> get_symmetric_memory(
const at::Tensor& tensor);
class TORCH_API SymmetricMemory : public c10::intrusive_ptr_target {
public:
...
virtual std::vector<void*> get_buffer_ptrs() = 0;
virtual std::vector<void*> get_signal_pad_ptrs() = 0;
virtual void** get_buffer_ptrs_dev() = 0;
virtual void** get_signal_pad_ptrs_dev() = 0;
virtual size_t get_buffer_size() = 0;
virtual size_t get_signal_pad_size() = 0;
virtual int get_rank() = 0;
virtual int get_world_size() = 0;
...
};
```
### Limitations of IntraNodeComm and ProcessGroupCudaP2p
Both `IntraNodeComm` (used by `ProcessGroupCudaP2p`) manages a single fixed-size workspace. This approach:
- Leads to awkward UX in which the required workspace needs to be specified upfront.
- Can not avoid extra copies for some algorithms in eager mode (e.g., custom/multimem all-reduce, reduce-scatter, all-gather).
- Prevents torch.compile from eliminating all copies.
In addition, they only offer out-of-the-box communication kernels and don't expose required pointers for user-defined, custom CUDA comm kernels.
* __->__ #128582
Pull Request resolved: https://github.com/pytorch/pytorch/pull/128582
Approved by: https://github.com/wanchaol