**Summary:** When operations are done on partial placements, we use sharding logic to incorrectly determine whether we should redistribute the tensor to replicate. By delaying the redistribution, we do the operation first, and then the partial reduction. This leads to incorrect results for max, min, gradient norm clipping, and more. We solve this by setting reduction_linear to False when there is a Partial placement to force the redistribution before completing the op.
**Test Cases**
1. pytest test/distributed/tensor/test_math_ops.py -k test_partial_reduction_ops
2. pytest test/distributed/tensor/test_math_ops.py -k test_matching_partial_reduction_ops
Pull Request resolved: https://github.com/pytorch/pytorch/pull/165962
Approved by: https://github.com/wconstab
- During op dispatch local tensor is supposed to collect rng state from CPU and CUDA
devices so that it can be reset before execution of the op for each such that ops
with randomness produces the same result for all ranks (note that we are planning a
separate change to add support of per rank rng state). Previously we relied on
op input arguments to deduce which devices to get rng state from. Which doesn't work
for factory functions such torch.randn. Hence this changes switches to uncondionally
collecting rng state from all devices.
- Fixing per rank specific computations in _MaskedPartial and Shard placements discovered
during test enablement.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/165716
Approved by: https://github.com/ezyang
- During op dispatch local tensor is supposed to collect rng state from CPU and CUDA
devices so that it can be reset before execution of the op for each such that ops
with randomness produces the same result for all ranks (note that we are planning a
separate change to add support of per rank rng state). Previously we relied on
op input arguments to deduce which devices to get rng state from. Which doesn't work
for factory functions such torch.randn. Hence this changes switches to uncondionally
collecting rng state from all devices.
- Fixing per rank specific computations in _MaskedPartial and Shard placements discovered
during test enablement.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/165716
Approved by: https://github.com/ezyang
Current implementation hardcodes 4D input and output tensor shapes
Change that by computing `output_conv_shape` for any number of input dims
Replace `[.., .., .., slice]` with `[..., slice]`
Pull Request resolved: https://github.com/pytorch/pytorch/pull/165241
Approved by: https://github.com/ezyang
A LocalTensor is a tensor subclass which simulates a tensor that is
distributed across SPMD ranks. A LocalTensor might be size N, but in fact
there are world_size shards/replicas of it stored internally. When you do a
plain PyTorch operation on it, we apply the operation to each shard; when you
do a collective, we do the mathematically equivalent operation on the local
shards. A LocalTensor is associated with a list of ranks which specify
which ranks it holds local tensors for.
NB, this is NOT a DataParallel like abstraction where you can run operations
on multiple different GPUs. It is intended purely for *debugging* purposes,
the overhead is almost certainly too high to keep eight GPUs (even the C++
autograd needs multithreading to keep up!) (It might potentially be possible
to trace through this with torch.compile and then compile it with CUDA graphs
but this is currently a non-goal.)
In order to handle MPMD, we provide a helper decorator that allows you to
run a function with no side effects for each LocalTensor shard and combine
results back into LocalTensor or LocalIntNode.
Note: This PR convert all DTensor ops and some DTensor tests to illustrate
intended usage and ensure conrrectness. In subsequent PR more tests will be
converted. DUring test conversion we aim to share as much as possible of
test logic between multi-process / multi-threaded and local tensor tests.
We would like to developers to be able to run both flavors of the tests.
Note: This work is based on the original proposal
by @ezyang (WIP PR https://github.com/pytorch/pytorch/pull/162753).
Pull Request resolved: https://github.com/pytorch/pytorch/pull/164537
Approved by: https://github.com/ezyang
Previous uneven `_StridedShard` in https://github.com/pytorch/pytorch/pull/150490 seems failing cases like sharding `tensor = torch.arange(6)` with FSDP 2, TP 2.
This PR attempts to reinvent `_StridedShard`.
I didn't test nested `_StridedShard`, because there shouldn't be any use cases. I think it will become quite messy when it comes to **nested uneven** `_StridedShard`. We are probably going to deprecate it anyway after @zpcore 's work https://github.com/pytorch/pytorch/pull/160266 on ordered sharding, so IMO not worth it to make it too general.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/163843
Approved by: https://github.com/ezyang
Fixing issue introduced in https://github.com/pytorch/pytorch/pull/158538
where `aten.copy_.default` is registered as a pointwise op, but without linearity.
In particular, when both `src` and `dst` tensors have same `Partial` placements, direct copy should happen without redistribute, instead of redistributing both to `Replicate` before making the copy.
This was discovered from silent incorrect results e.g. on `torch.einsum` backward.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/162460
Approved by: https://github.com/zpcore
**Summary**
This PR enables in-place op `aten.squeeze_.dim` on DTensor with a change to
DTensor dispatch logic: when processing in-place operator, we should assign
`output_sharding.output_spec` back to the first argument. This is because
the in-place op_call on `arg._local_tensor` could also shift the tensor meta.
**Test**
`pytest test/distributed/tensor/test_view_ops.py -s -k test_squeeze_`
Pull Request resolved: https://github.com/pytorch/pytorch/pull/159532
Approved by: https://github.com/zpcore
Reduces collective calls in the forward pass from 2 to 1
In #158716 I added the sharding rule for the backward pass but didn't add the forward pass as it didn't get dispatched. After #159324 this should get properly dispatched hence I am adding it now.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/159692
Approved by: https://github.com/tianyu-l
**Summary**
Some thoughts on view-op and `_StridedShard` interaction:
1. `_StridedShard` has no impact on sharding (i.e. how tensor is partitioned)
compared to `Shard`. It only changes how shards permute across the devices.
2. `view()` op on DTensor strictly forbids shard redistribution which means if
`view()` may cause shard permutation across devices, it should be rejected.
This is enforced in today's sharding prop for `view()`.
3. Since DTensor `view()` won't introduce any redistribution, it's certain that
`placements` won't change except the inner `dim` attribute of `Shard`
or `_StridedShard`.
Therefore, to support `_StridedShard` in `view()` op, the only change required
is to keep `_StridedShard` as `_StridedShard` in the output spec.
**Test**
`pytest test/distributed/tensor/test_view_ops.py`
Pull Request resolved: https://github.com/pytorch/pytorch/pull/159656
Approved by: https://github.com/wconstab
- Sort strategy now supports sharding on non sorted dim.
~~- Fix histc xfail.~~
- ~~Previously `python test/distributed/tensor/test_dtensor_ops.py TestDTensorOpsCPU.test_dtensor_op_db_histc_cpu_float32` will fail with `PYTORCH_OPINFO_SAMPLE_INPUT_INDEX=18`. However, if we run `PYTORCH_OPINFO_SAMPLE_INPUT_INDEX=18 python test/distributed/tensor/test_dtensor_ops.py TestDTensorOpsCPU.test_dtensor_op_db_histc_cpu_float32`, the test will pass. This kind of error is due to DTensor reuses the strategy schema hashing. It turns out that not only the strategy, the result correctness also depends on `static_argnum` or the op will reuse the previous args from hashed schema and output wrong results. I updated the document also.~~ (fixed in https://github.com/pytorch/pytorch/pull/159289)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/159189
Approved by: https://github.com/XilunWu
If `return_debug_mask` is False (which is the default value for SDPA), the attention tensor returned is an empty tensor (which has 0 dimensions). This means that the shardings for the batch and CP case are that are passed can yield invalid dimensions.
This PR fixes it for `scaled_dot_product_flash_attention_strategy`. Note that `scaled_dot_product_cudnn_attention_strategy` doen't have this issue
Pull Request resolved: https://github.com/pytorch/pytorch/pull/159205
Approved by: https://github.com/wconstab
Add `sort`, `scatter_add` strategy. I am reusing the strategy for `scatter` related ops for a quick support. The strategy can be potential improved after we fix index related strategies.
Minor fix: fix `replicate_op_strategy` to support output multiple tensors, which is required by aten.sort.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/159022
Approved by: https://github.com/XilunWu, https://github.com/wconstab
#### 1. Provide a default fallback strategy that can apply to arbitrary operator with output in type of single tensor.
We can call register_op_strategy to register using the `fallback_op_strategy`:
- For op without List[Tensor] as input, call:
```
register_op_strategy(op_overload)(replicate_op_strategy)
```
- For op contains List[Tensor] as input, call:
```
register_op_strategy(op_overload, schema_info=RuntimeSchemaInfo(needs_pytree=True))(replicate_op_strategy)
```
The strategy will force all input and output to be replicated with the corresponding redistribute_cost.
#### 2. Add a test function as a necessary condition for strategy function.
```
detect_exists_identical_opspec(*args, op, mesh, strategy_function)
```
This function detects if identical strategies will be produced given the sample `args`. It will iterate all combinations of placements for each arg and produce the output strategy from the registered `strategy_function`.
#### 3. Provide a context manger `op_strategy_context` to easily register/unregister strategies for testing.
E.g.,
```
with op_strategy_context(test_op.default, replicate_op_strategy):
...
```
#### 4. Fix a bug that TupleStrategy never get flatten as expected:
9df0176408/torch/distributed/tensor/_op_schema.py (L286)
Basically we need to 1) register_pytree_node for TupleStrategy, 2) propagate the schema_info to `strategy_schema` after `strategy_schema = _wrap_with_op_strategy(op_schema)`.
This is the first implementation. Plan to add support to enable sharding on the batch dim as the output strategy next.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/158046
Approved by: https://github.com/wanchaol, https://github.com/wconstab
The previous strategy directly used 'self' input strategy for 'src'
input. The fixed strategy correctly maps the self dim to src dim
so that it works even if the src input is broadcast.
E.g. for this program, broadcasting will occur on dims 0,1,3 of self.
```
self = torch.ones((2,3,4,5))
src = torch.ones((4,1))
self.copy_(src)
```
These are the correct sharding combinations:
| self | src |
|-------|------|
| Shard(0) | Replicate() |
| Shard(1) | Replicate() |
| Shard(2) | Shard(0) |
| Shard(3) | Shard(1) |
Pull Request resolved: https://github.com/pytorch/pytorch/pull/158538
Approved by: https://github.com/zpcore, https://github.com/XilunWu, https://github.com/wanchaol
ghstack dependencies: #158490
Fixes several bugs in the original.
- foremost, fixes a serious bug where we returned incorrect strategies
by mixing input_specs that were frozen from
select_strategy.strategies[0] with output_specs that varied across
select_strategy.strategies[0..N] (e.g. we could create a nonsense
strategy like input:Shard(0) output(Replicate) for an op like clone
- fixes the redistribute costs: they should not actually be 0, they
should be the cost of redistributing our single input from another
strategy to the current strategy, in our list of output strategies
- adds a note, wondering if we should have just literally returned the
input strategy instead of creating this new object
- Currently, using default_strategy is incorrect becuase it maps 'self'
tensor's strategies directly onto 'src' tensor without accounting for
the fact that copy_ supports broadcasting a smaller rank tensor into a
larger one.
Separates out copy_ op from default strategy, adds missing test case,
but does not fix the underlying issue with copy_, leaves that for future
PR
Renames to `propagate_single_input_strategy` since that's more
descriptive
Pull Request resolved: https://github.com/pytorch/pytorch/pull/158490
Approved by: https://github.com/wanchaol, https://github.com/XilunWu