Compare commits

...

31 Commits

Author SHA1 Message Date
5565412840 Release: v1.9.0 2025-07-16 15:36:40 +00:00
12f89bb754 do not call partial state if not initialized 2025-07-16 13:42:58 +00:00
348aabaaaf Update Gaudi runner image to latest SynapseAI and enable previously disabled tests (#3653)
* update synapse and add tp tests

* only skip regional compile speedup check

* pass sdp test on hpu
2025-07-16 14:33:36 +02:00
3b13453bbf “Stop Halving My Batch!” · Default back-off 0.5 → 0.9 (#3684)
* feat(memory): change default find_executable_batch_size to change by 10% instead of 50%

* Update test_memory_utils.py

* Apply style fixes

---------

Co-authored-by: Amit Moryossef <amitmoryossef@gmail.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
2025-07-16 12:32:46 +02:00
0408ab12d7 warn for invalid keys (#3613)
* warn for invalid keys

* add test for check_device_map invalid keys

* Apply style fixes

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
2025-07-16 12:23:41 +02:00
55e518a762 accelerate/data_loader.py: do not yield if the base_dataloader is empty (#3659)
* accelerate/data_loader.py: do not yield if the base_dataloader is empty

in the code:
```
        dataloader_iter = self.base_dataloader.__iter__()
        # We iterate one batch ahead to check when we are at the end
        try:
            current_batch = next(dataloader_iter)
        except StopIteration:
            yield
```

If the base dataloader is empty then the exception is raised but `yield`
yields nothing.

This at the time of:
```
if self.device is not None:
                    current_batch = send_to_device(current_batch, self.device, non_blocking=self._non_blocking)
```

would lead to uncaught exception like:
 File "/root/rl-swarm/.venv/lib/python3.10/site-packages/accelerate/data_loader.py", line 575, in iter
    current_batch = send_to_device(current_batch, self.device, non_blocking=self._non_blocking)
UnboundLocalError: local variable 'current_batch' referenced before assignment because `current_batch`
was never assigned because `next(dataloader_iter)` returned with exception `StopIteration`.

Signed-off-by: 0xnightwind <nightwind1899@gmail.com>

* Update src/accelerate/data_loader.py

---------

Signed-off-by: 0xnightwind <nightwind1899@gmail.com>
Co-authored-by: Marc Sun <57196510+SunMarc@users.noreply.github.com>
2025-07-16 12:04:25 +02:00
7e11ac43f0 fix: wandb config not saved in offline mode (#3648)
* fix: wandb config not saved in offline mode

* Apply style fixes

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
2025-07-15 17:51:44 +02:00
e2cc537db8 trackio (#3669)
* trackio

* Apply suggestions from code review

Co-authored-by: Steven Liu <59462357+stevhliu@users.noreply.github.com>
Co-authored-by: Abubakar Abid <abubakar@huggingface.co>

* seven -> eight

* Add trackio as a real tracker instead

* Sort

* Style

* Style

* Remove step

* Disable trackio on Python < 3.10

* Update src/accelerate/tracking.py

Co-authored-by: Steven Liu <59462357+stevhliu@users.noreply.github.com>

* More style

---------

Co-authored-by: Steven Liu <59462357+stevhliu@users.noreply.github.com>
Co-authored-by: Abubakar Abid <abubakar@huggingface.co>
2025-07-15 17:17:49 +02:00
847ae58c74 Fix FP8 tests, enable FP8 to be used without direct Accelerator() configuring (#3677)
* single-gpu tests passing

* install deepspeed in fp8 container

* revert mixed_precision check
2025-07-15 15:20:57 +02:00
6e104f31de unpin datasets (#3681) 2025-07-15 15:00:35 +02:00
524e5f9828 Speedup model loading by 4-5x in Diffusers (#3674)
* update

* update

* make style

* update

* merge if statements
2025-07-11 16:58:35 +02:00
d6c986c3f2 Bunch of FSDP improvements (#3671)
* Feat: split tests

* Feat: finito

* Fix

* Final, tests pass
2025-07-09 16:05:22 +02:00
1ac8643df7 xpu enablement on left cases (#3654)
* 1. enable xpu for launcher 2. expand cuda only ds uts to xpu 3. expand profiler example to xpu

Signed-off-by: YAO Matrix <matrix.yao@intel.com>

* fix style

Signed-off-by: YAO Matrix <matrix.yao@intel.com>

* rename

Signed-off-by: YAO Matrix <matrix.yao@intel.com>

* Update profiler.py

* Apply style fixes

---------

Signed-off-by: YAO Matrix <matrix.yao@intel.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
2025-07-07 18:10:53 +02:00
07ce74868c Fix: properly error when DDP + Dtensor model (#3629)
* Feat: add check

* Refactor: nits
2025-06-27 01:33:45 +02:00
175fe91589 Added a check in the no_sync() function to avoid errors when using deepspeed zero2/3. (#3656) 2025-06-26 14:39:04 +02:00
fe16ce8bce Fix fsdp2 example (#3657) 2025-06-26 14:08:51 +02:00
5987d79a53 Update gradient_accumulation.md (#3649) 2025-06-23 11:58:31 +02:00
31af8d4e8e shards (#3645) 2025-06-20 11:24:20 +02:00
b7493a82b1 Add support for e5e2 and default to hybrid when launcher is used (#3640)
* add support for e5e2 and defaumt to hybrid when launcher is used

* style
2025-06-20 11:11:32 +02:00
a16d2bb3c1 bump to v1.9.0dev 2025-06-19 15:13:41 +02:00
cac22ed980 fix grad acc deepspeed (#3638)
* fix grad acc deepspeed

* style
2025-06-19 12:06:21 +02:00
be826a6b7b Fix: correct labels (#3637) 2025-06-19 11:01:56 +02:00
5939640829 Feat: add cpu offload (#3636) 2025-06-18 18:13:45 +02:00
7f9c8cbe34 [DeepSpeed] sync gradient accum steps from deepspeed plugin (#3632)
* sync steps

* add a debug log when overriding

* make grad accum always consistent

* remove debug
2025-06-18 16:45:57 +02:00
9888c7ed23 feat: use datasets.IterableDataset shard if possible (#3635)
* feat: use datasets.IterableDataset shard if possible.

When `accelerator.prepare` is called on a
`datasets.IterableDataset`, use the `shard` method to
split the dataset across the available processes. This
allows for more efficient data loading and processing.
Without load and slice overhead of `IterableDatasetShard`

* dataset

* remove unused import

* style

---------

Co-authored-by: wuwenxu.01 <wuwenxu.01@bytedance.com>
2025-06-18 16:45:17 +02:00
42a68c30dc Fix Typos in Documentation and Comments (#3621)
* Update state.py

* Update tracking.py
2025-06-18 15:53:02 +02:00
6597dae780 Integrate SwanLab for offline/online experiment tracking for Accelerate (#3605)
* add support for SwanLabTracker and update related documentation

* add emoji in FRAMWORK

* apply the style corrections and quality control

* add support for SwanLabTracker in tests

* fix bug in test_tracking
2025-06-18 15:42:29 +02:00
8878d93745 remove hardcoded cuda from fsdpv2 (#3631) 2025-06-17 14:32:10 +02:00
2eaf5cdbbc remove ipex.optimize in accelerate (#3608)
* remove ipex.optimize in accelerate

Signed-off-by: YAO Matrix <matrix.yao@intel.com>

* fix mis-style

Signed-off-by: YAO Matrix <matrix.yao@intel.com>

* Update intel_cpu.md

* Update launch.py

* fix comments

Signed-off-by: YAO Matrix <matrix.yao@intel.com>

* fix style

Signed-off-by: YAO Matrix <matrix.yao@intel.com>

* add logging

Signed-off-by: YAO Matrix <matrix.yao@intel.com>

* Update launch.py

* Apply style fixes

---------

Signed-off-by: YAO Matrix <matrix.yao@intel.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
2025-06-17 11:08:19 +02:00
23c1d8db89 [Deepspeed] deepspeed auto grad accum (#3630)
* deepspeed auto grad accum

* add tests for grad accum

* use tiny-random-gpt2

* Update tests/deepspeed/test_deepspeed_gradient_accumulation.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix redundant code

* set_gradient_accumulation_boundary is always there

* remove unused helper

* no need for this

* full revert

* Apply style fixes

* get_global_grad_norm is always there

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
2025-06-16 16:28:24 +02:00
0af621bbec add xpu support in TorchTensorParallelPlugin (#3627)
* add xpu support in TorchTensorParallelPlugin

Signed-off-by: YAO Matrix <matrix.yao@intel.com>

* fix typo

Signed-off-by: YAO Matrix <matrix.yao@intel.com>

---------

Signed-off-by: YAO Matrix <matrix.yao@intel.com>
2025-06-13 17:45:51 +02:00
46 changed files with 1145 additions and 277 deletions

View File

@ -15,7 +15,7 @@ jobs:
group: itac-bm-emr-gaudi3-dell-2gaudi
container:
image: docker://vault.habana.ai/gaudi-docker/1.20.0/ubuntu22.04/habanalabs/pytorch-installer-2.6.0:latest
image: docker://vault.habana.ai/gaudi-docker/1.21.1/ubuntu22.04/habanalabs/pytorch-installer-2.6.0:latest
options: --runtime=habana --shm-size=64G --cap-add=sys_nice --env HABANA_VISIBLE_DEVICES
env:
OMPI_MCA_btl_vader_single_copy_mechanism: none
@ -66,16 +66,21 @@ jobs:
run: |
make test_big_modeling
- name: Run FSDP integration tests
if: ${{ !cancelled() && (success() || failure()) }}
run: |
make test_fsdp
- name: Run DeepSpeed integration tests
if: ${{ !cancelled() && (success() || failure()) }}
run: |
make test_deepspeed
- name: Run FSDP integration tests
if: ${{ !cancelled() && (success() || failure()) }}
run: |
make test_fsdp
- name: Run TP integration tests
if: ${{ !cancelled() && (success() || failure()) }}
run: |
make test_tp
- name: Run Examples tests
if: ${{ !cancelled() && (success() || failure()) }}
run: |

View File

@ -23,16 +23,23 @@ style:
doc-builder style src/accelerate docs/source --max_len 119
# Run tests for the library
test_big_modeling:
python -m pytest -s -v ./tests/test_big_modeling.py ./tests/test_modeling_utils.py $(if $(IS_GITHUB_CI),--report-log "$(PYTORCH_VERSION)_big_modeling.log",)
test_core:
python -m pytest -s -v ./tests/ --ignore=./tests/test_examples.py --ignore=./tests/deepspeed --ignore=./tests/test_big_modeling.py \
--ignore=./tests/fsdp --ignore=./tests/tp --ignore=./tests/test_cli.py $(if $(IS_GITHUB_CI),--report-log "$(PYTORCH_VERSION)_core.log",)
python -m pytest -s -v ./tests/ \
--ignore=./tests/test_big_modeling.py \
--ignore=./tests/test_modeling_utils.py \
--ignore=./tests/test_examples.py \
--ignore=./tests/test_cli.py \
--ignore=./tests/deepspeed \
--ignore=./tests/fsdp \
--ignore=./tests/tp \
$(if $(IS_GITHUB_CI),--report-log "$(PYTORCH_VERSION)_core.log",)
test_cli:
python -m pytest -s -v ./tests/test_cli.py $(if $(IS_GITHUB_CI),--report-log "$(PYTORCH_VERSION)_cli.log",)
test_big_modeling:
python -m pytest -s -v ./tests/test_big_modeling.py ./tests/test_modeling_utils.py $(if $(IS_GITHUB_CI),--report-log "$(PYTORCH_VERSION)_big_modeling.log",)
test_deepspeed:
python -m pytest -s -v ./tests/deepspeed $(if $(IS_GITHUB_CI),--report-log "$(PYTORCH_VERSION)_deepspeed.log",)

View File

@ -7,7 +7,7 @@ RUN pip install transformers evaluate datasets
RUN git clone https://github.com/huggingface/accelerate.git
RUN cd accelerate && \
pip install -e . && \
pip install -e .[deepspeed] && \
cd benchmarks/fp8
RUN /bin/bash

View File

@ -62,8 +62,8 @@
title: Amazon SageMaker
- local: usage_guides/mps
title: Apple M1 GPUs
- local: usage_guides/ipex
title: IPEX training with CPU
- local: usage_guides/intel_cpu
title: Intel CPU
- local: usage_guides/gaudi
title: Intel Gaudi
- local: usage_guides/compilation

View File

@ -139,7 +139,7 @@ values. They can also be passed in manually.
* `--cpu` (`bool`) -- Whether or not to force the training on the CPU.
* `--multi_gpu` (`bool`) -- Whether or not this should launch a distributed GPU training.
* `--tpu` (`bool`) -- Whether or not this should launch a TPU training.
* `--ipex` (`bool`) -- Whether or not this should launch an Intel Pytorch Extension (IPEX) training.
* `--ipex` (`bool`) -- Whether or not this should launch an Intel Pytorch Extension (IPEX) training. **This argument is deprecated, will be removed in Accelerate v1.10**
**Resource Selection Arguments**:
@ -158,7 +158,7 @@ The following arguments are useful for selecting which training paradigm to use.
* `--use_deepspeed` (`bool`) -- Whether or not to use DeepSpeed for training.
* `--use_fsdp` (`bool`) -- Whether or not to use FullyShardedDataParallel for training.
* `--use_megatron_lm` (`bool`) -- Whether or not to use Megatron-LM for training.
* `--use_xpu` (`bool`) -- Whether to use IPEX plugin to speed up training on XPU specifically. **This argument is deprecated and ignored, will be removed in Accelerate v1.20**
* `--use_xpu` (`bool`) -- Whether to use IPEX plugin to speed up training on XPU specifically. **This argument is deprecated and ignored, will be removed in Accelerate v1.10**
**Distributed GPU Arguments**:

View File

@ -29,6 +29,11 @@ rendered properly in your Markdown viewer.
[[autodoc]] tracking.WandBTracker
- __init__
## Trackio
[[autodoc]] tracking.TrackioTracker
- __init__
## CometMLTracker
[[autodoc]] tracking.CometMLTracker
@ -48,3 +53,8 @@ rendered properly in your Markdown viewer.
[[autodoc]] tracking.ClearMLTracker
- __init__
## SwanLabTracker
[[autodoc]] tracking.SwanLabTracker
- __init__

View File

@ -245,7 +245,7 @@ As was pointed out in this [blog-post](https://huggingface.co/blog/gradient_accu
> [...] for gradient accumulation across token-level tasks like causal LM training, the correct loss should be computed by the **total loss across all batches in a gradient accumulation step** divided by the **total number of all non padding tokens in those batches**. This is not the same as the average of the per-batch loss values.
In other words, some adjustements must be made on losses that operate on a token-level basis.
In other words, some adjustments must be made on losses that operate on a token-level basis.
### Skeleton code
@ -282,7 +282,7 @@ for update_step in range(total_updates):
num_items_in_batch = accelerator.gather(num_items_in_batch).sum().item()
for i, batch in enumerate(batch_samples):
# if we perform gradient accumulation in a multi-devices set-up, we want to avoid unecessary communications when accumulating
# if we perform gradient accumulation in a multi-devices set-up, we want to avoid unnecessary communications when accumulating
# cf: https://muellerzr.github.io/blog/gradient_accumulation.html
if (i < len(batch_samples) - 1 and accelerator.num_processes > 1):
ctx = model.no_sync
@ -294,7 +294,7 @@ for update_step in range(total_updates):
with ctx():
inputs, targets = batch
outputs = model(inputs)
loss = loss_function(outputs, targets) # the loss function shoud sum over samples rather than averaging
loss = loss_function(outputs, targets) # the loss function should sum over samples rather than averaging
# We multiply by num_processes because the DDP calculates the average gradient across all devices whereas dividing by num_items_in_batch already takes into account all devices
# Same reason for gradient_accumulation_steps, but this times it's Accelerate that calculate the average gradient across the accumulated steps
@ -394,7 +394,7 @@ for update_step in range(total_gradient_updates):
for i, batch in enumerate(batch_samples):
inputs, labels = batch["input_ids"], batch["labels"]
total_batched_samples += 1
# if we perform gradient accumulation in a multi-devices set-up, we want to avoid unecessary communications when accumulating
# if we perform gradient accumulation in a multi-devices set-up, we want to avoid unnecessary communications when accumulating
# cf: https://muellerzr.github.io/blog/gradient_accumulation.html
if (i < len(batch_samples) - 1 and accelerator.num_processes > 1):
ctx = model.no_sync

View File

@ -13,34 +13,11 @@ specific language governing permissions and limitations under the License.
rendered properly in your Markdown viewer.
-->
# Intel® Extension for PyTorch
[IPEX](https://github.com/intel/intel-extension-for-pytorch) is optimized for CPUs with AVX-512 or above, and functionally works for CPUs with only AVX2. So, it is expected to bring performance benefit for Intel CPU generations with AVX-512 or above while CPUs with only AVX2 (e.g., AMD CPUs or older Intel CPUs) might result in a better performance under IPEX, but not guaranteed. IPEX provides performance optimizations for CPU training with both Float32 and BFloat16. The usage of BFloat16 is the main focus of the following sections.
Low precision data type BFloat16 has been natively supported on the 3rd Generation Xeon® Scalable Processors (aka Cooper Lake) with AVX512 instruction set and will be supported on the next generation of Intel® Xeon® Scalable Processors with Intel® Advanced Matrix Extensions (Intel® AMX) instruction set with further boosted performance. The Auto Mixed Precision for CPU backend has been enabled since PyTorch-1.10. At the same time, the support of Auto Mixed Precision with BFloat16 for CPU and BFloat16 optimization of operators has been massively enabled in Intel® Extension for PyTorch, and partially upstreamed to PyTorch master branch. Users can get better performance and user experience with IPEX Auto Mixed Precision.
## IPEX installation:
IPEX release is following PyTorch, to install via pip:
| PyTorch Version | IPEX version |
| :---------------: | :----------: |
| 2.0 | 2.0.0 |
| 1.13 | 1.13.0 |
| 1.12 | 1.12.300 |
| 1.11 | 1.11.200 |
| 1.10 | 1.10.100 |
```
pip install intel_extension_for_pytorch==<version_name> -f https://developer.intel.com/ipex-whl-stable-cpu
```
Check more approaches for [IPEX installation](https://intel.github.io/intel-extension-for-pytorch/cpu/latest/tutorials/installation.html).
# Training on Intel CPU
## How It Works For Training optimization in CPU
Accelerate has integrated [IPEX](https://github.com/intel/intel-extension-for-pytorch), all you need to do is enabling it through the config.
Accelerate has full support for Intel CPU, all you need to do is enabling it through the config.
**Scenario 1**: Acceleration of No distributed CPU training
@ -55,7 +32,6 @@ This machine
Which type of machine are you using?
No distributed training
Do you want to run your training on CPU only (even if a GPU / Apple Silicon device is available)? [yes/NO]:yes
Do you want to use Intel PyTorch Extension (IPEX) to speed up training on CPU? [yes/NO]:yes
Do you wish to optimize your script with torch dynamo?[yes/NO]:NO
Do you want to use DeepSpeed? [yes/NO]: NO
-----------------------------------------------------------------------------------------------------------------------------------------------------------
@ -69,15 +45,12 @@ default options when doing
accelerate launch my_script.py --args_to_my_script
```
For instance, here is how you would run the NLP example `examples/nlp_example.py` (from the root of the repo) with IPEX enabled.
default_config.yaml that is generated after `accelerate config`
For instance, here is how you would run the NLP example `examples/nlp_example.py` (from the root of the repo) with `default_config.yaml` which is generated by `accelerate config`
```bash
compute_environment: LOCAL_MACHINE
distributed_type: 'NO'
downcast_bf16: 'no'
ipex_config:
ipex: true
machine_rank: 0
main_training_function: main
mixed_precision: bf16
@ -117,7 +90,6 @@ What is the rank of this machine?
What is the IP address of the machine that will host the main process? 36.112.23.24
What is the port you will use to communicate with the main process? 29500
Are all the machines on the same local network? Answer `no` if nodes are on the cloud and/or on different network hosts [YES/no]: yes
Do you want to use Intel PyTorch Extension (IPEX) to speed up training on CPU? [yes/NO]:yes
Do you want accelerate to launch mpirun? [yes/NO]: yes
Please enter the path to the hostfile to use with mpirun [~/hostfile]: ~/hostfile
Enter the number of oneCCL worker threads [1]: 1
@ -129,13 +101,11 @@ bf16
```
For instance, here is how you would run the NLP example `examples/nlp_example.py` (from the root of the repo) with IPEX enabled for distributed CPU training.
default_config.yaml that is generated after `accelerate config`
`default_config.yaml` which is generated by `accelerate config`
```bash
compute_environment: LOCAL_MACHINE
distributed_type: MULTI_CPU
downcast_bf16: 'no'
ipex_config:
ipex: true
machine_rank: 0
main_process_ip: 36.112.23.24
main_process_port: 29500
@ -156,8 +126,10 @@ use_cpu: true
Set following env and using intel MPI to launch the training
In node0, you need to create a configuration file which contains the IP addresses of each node (for example hostfile) and pass that configuration file path as an argument.
If you selected to have Accelerate launch `mpirun`, ensure that the location of your hostfile matches the path in the config.
In `node0`, you need to create a configuration file which contains the IP addresses of each node (for example hostfile) and pass that configuration file path as an argument.
If you selected to let Accelerate launch `mpirun`, ensure that the location of your hostfile matches the path in the config.
```bash
$ cat hostfile
xxx.xxx.xxx.xxx #node0 ip
@ -165,18 +137,18 @@ xxx.xxx.xxx.xxx #node1 ip
xxx.xxx.xxx.xxx #node2 ip
xxx.xxx.xxx.xxx #node3 ip
```
When Accelerate is launching `mpirun`, source the oneCCL bindings setvars.sh to get your Intel MPI environment, and then
run your script using `accelerate launch`. Note that the python script and environment needs to exist on all of the
machines being used for multi-CPU training.
Before executing `accelerate launch` command, you need source the oneCCL bindings `setvars.sh` to get your Intel MPI environment properly. Note that both the python script and environment need to be available on all of the machines being used for multi-CPU training.
```bash
oneccl_bindings_for_pytorch_path=$(python -c "from oneccl_bindings_for_pytorch import cwd; print(cwd)")
source $oneccl_bindings_for_pytorch_path/env/setvars.sh
accelerate launch examples/nlp_example.py
```
Otherwise, if you selected not to have Accelerate launch `mpirun`, run the following command in node0 and **16DDP** will
be enabled in node0,node1,node2,node3 with BF16 mixed precision. When using this method, the python script, python
environment, and accelerate config file need to be present on all of the machines used for multi-CPU training.
You can also directly launch distributed training with `mpirun` command, you need to run the following command in node0 and **16DDP** will be enabled in node0,node1,node2,node3 with BF16 mixed precision. When using this method, the python script, python environment, and accelerate config file need to be available on all of the machines used for multi-CPU training.
```bash
oneccl_bindings_for_pytorch_path=$(python -c "from oneccl_bindings_for_pytorch import cwd; print(cwd)")
source $oneccl_bindings_for_pytorch_path/env/setvars.sh
@ -185,11 +157,3 @@ export MASTER_ADDR=xxx.xxx.xxx.xxx #node0 ip
export CCL_ATL_TRANSPORT=ofi
mpirun -f hostfile -n 16 -ppn 4 accelerate launch examples/nlp_example.py
```
## Related Resources
- [Project's github](https://github.com/intel/intel-extension-for-pytorch)
- [API docs](https://intel.github.io/intel-extension-for-pytorch/cpu/latest/tutorials/api_doc.html)
- [Tuning guide](https://intel.github.io/intel-extension-for-pytorch/cpu/latest/tutorials/performance_tuning/tuning_guide.html)
- [Blogs & Publications](https://intel.github.io/intel-extension-for-pytorch/cpu/latest/tutorials/blogs_publications.html)

View File

@ -20,10 +20,11 @@ Accelerate provides a general tracking API that can be used to log useful items
## Integrated Trackers
Currently `Accelerate` supports seven trackers out-of-the-box:
Currently `Accelerate` supports eight trackers out-of-the-box:
- TensorBoard
- WandB
- WandB
- Trackio
- CometML
- Aim
- MLFlow

View File

@ -218,7 +218,7 @@ def parse_args():
default="all",
help=(
'The integration to report the results and logs to. Supported platforms are `"tensorboard"`,'
' `"wandb"`, `"comet_ml"`, and `"dvclive"`. Use `"all"` (default) to report to all integrations.'
' `"wandb"`, `"comet_ml"`, `"dvclive"`, and `"swanlab"`. Use `"all"` (default) to report to all integrations.'
"Only applicable when `--with_tracking` is passed."
),
)

View File

@ -215,7 +215,7 @@ def parse_args():
default="all",
help=(
'The integration to report the results and logs to. Supported platforms are `"tensorboard"`,'
' `"wandb"`, `"comet_ml"`, and `"dvclive"`. Use `"all"` (default) to report to all integrations.'
' `"wandb"`, `"comet_ml"`, and `"dvclive"`, and `"swanlab"`. Use `"all"` (default) to report to all integrations.'
"Only applicable when `--with_tracking` is passed."
),
)

View File

@ -31,8 +31,8 @@ from accelerate.utils import ProfileKwargs
#
# This example trains a Bert base model on GLUE MRPC
# in any of the following settings (with the same script):
# - single CPU or single GPU
# - multi GPUS (using PyTorch distributed mode)
# - single CPU or single device (CUDA GPU, Intel XPU etc.)
# - multi devices (using PyTorch distributed mode)
# - (multi) TPUs
# - fp16 (mixed-precision) or fp32 (normal precision)
#
@ -183,7 +183,8 @@ def training_function(config, args):
# New Code #
accelerator.print(
prof.key_averages().table(
sort_by="self_cpu_time_total" if args.cpu else "self_cuda_time_total", row_limit=-1
sort_by="self_cpu_time_total" if args.cpu else f"self_{accelerator.device.type}_time_total",
row_limit=-1,
)
)
@ -215,7 +216,7 @@ def main():
choices=["no", "fp16", "bf16", "fp8"],
help="Whether to use mixed precision. Choose"
"between fp16 and bf16 (bfloat16). Bf16 requires PyTorch >= 1.10."
"and an Nvidia Ampere GPU.",
"and an Nvidia Ampere GPU or an Intel XPU.",
)
# New Code #
parser.add_argument(

View File

@ -11,8 +11,8 @@ fp8_config:
fp8_format: E4M3
interval: 1
margin: 0
override_linear_precision: (false, false, false)
override_linear_precision: [false, false, false]
# Generally this should always be set to `false` to have the most realistic fp8 eval performance
use_autocast_during_eval: false
# If using MS-AMP, we ignore all of the prior and set a opt_level
#opt_level: O1
#opt_level: O1

View File

@ -32,5 +32,5 @@ In our example, we use a 8B Llama3.1 model, which has a hidden dimension of 4096
The figures above were generated on 8x H100 SXM GPUs, with 8192 sequence length and 1000 steps. To run the example, you can use the following command, where you can specify the precision to train in:
```bash
accelerate launch --fsdp2_fp8.py --sequence_length 8192 --num_steps 1000 --log_with wandb --precision [fp8 | bf16]
accelerate launch fsdp2_fp8.py --sequence-length 8192 --num-steps 1000 --log_with wandb --precision [fp8 | bf16]
```

View File

@ -187,7 +187,10 @@ def main():
def collate_fn(batch):
input_ids = torch.tensor([item["input_ids"] for item in batch], dtype=torch.long)
labels = torch.tensor([item["labels"] for item in batch], dtype=torch.long)
return {"input_ids": input_ids, "labels": labels}
# Transformers expect `labels` to not be shifted, though we already shifted them, so we pass them both
# We need to pass both `shift_labels` and `labels` to the model, as the loss is calculated inside `if labels is not None`
# `shift_labels` take precedence over `labels` in this case
return {"input_ids": input_ids, "labels": labels, "shift_labels": labels}
# We keep batch size at 1, as it is basically the same as sequence length, which we use instead
dataloader = DataLoader(dataset, batch_size=1, collate_fn=collate_fn)

View File

@ -1,5 +1,5 @@
accelerate # used to be installed in Amazon SageMaker environment
evaluate
datasets==2.3.2
datasets
schedulefree
huggingface_hub>=0.20.0

View File

@ -41,7 +41,16 @@ extras["deepspeed"] = ["deepspeed"]
extras["rich"] = ["rich"]
extras["test_fp8"] = ["torchao"] # note: TE for now needs to be done via pulling down the docker image directly
extras["test_trackers"] = ["wandb", "comet-ml", "tensorboard", "dvclive", "mlflow", "matplotlib"]
extras["test_trackers"] = [
"wandb",
"comet-ml",
"tensorboard",
"dvclive",
"mlflow",
"matplotlib",
"swanlab",
"trackio",
]
extras["dev"] = extras["quality"] + extras["testing"] + extras["rich"]
extras["sagemaker"] = [
@ -50,7 +59,7 @@ extras["sagemaker"] = [
setup(
name="accelerate",
version="1.8.0.dev0",
version="1.9.0",
description="Accelerate",
long_description=open("README.md", encoding="utf-8").read(),
long_description_content_type="text/markdown",

View File

@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__version__ = "1.8.0.dev0"
__version__ = "1.9.0"
from .accelerator import Accelerator
from .big_modeling import (

View File

@ -33,6 +33,8 @@ import torch
import torch.utils.hooks as hooks
from huggingface_hub import split_torch_state_dict_into_shards
from accelerate.utils.dataclasses import FP8BackendType
from .checkpointing import load_accelerator_state, load_custom_state, save_accelerator_state, save_custom_state
from .data_loader import DataLoaderDispatcher, prepare_data_loader, skip_first_batches
from .logging import get_logger
@ -108,6 +110,7 @@ from .utils import (
is_xpu_available,
load_fsdp_model,
load_fsdp_optimizer,
model_has_dtensor,
pad_across_processes,
parse_choice_from_env,
recursively_apply,
@ -124,6 +127,7 @@ from .utils.constants import (
FSDP2_PYTORCH_VERSION,
FSDP_PYTORCH_VERSION,
PROFILE_PATTERN_NAME,
SCALER_NAME,
)
from .utils.modeling import get_state_dict_offloaded_model
from .utils.other import compile_regions, compile_regions_deepspeed, is_compiled_module
@ -229,7 +233,12 @@ class Accelerator:
- `"all"`
- `"tensorboard"`
- `"wandb"`
- `"trackio"`
- `"aim"`
- `"comet_ml"`
- `"mlflow"`
- `"dvclive"`
- `"swanlab"`
If `"all"` is selected, will pick up all available trackers in the environment and initialize them. Can
also accept implementations of `GeneralTracker` for custom trackers, and can be combined with `"all"`.
project_config ([`~utils.ProjectConfiguration`], *optional*):
@ -298,6 +307,7 @@ class Accelerator:
self.project_configuration = ProjectConfiguration(project_dir=project_dir)
if project_dir is not None and self.project_dir is None:
self.project_configuration.set_directories(project_dir)
if mixed_precision is not None:
mixed_precision = str(mixed_precision)
if mixed_precision not in PrecisionType:
@ -319,7 +329,7 @@ class Accelerator:
if deepspeed_plugins is None:
# First check if we're creating another `Accelerator` w/o setting `deepspeed_plugin`
if PartialState._shared_state != {} and PartialState().distributed_type == DistributedType.DEEPSPEED:
if AcceleratorState._shared_state != {} and AcceleratorState().distributed_type == DistributedType.DEEPSPEED:
deepspeed_plugins = AcceleratorState().deepspeed_plugins
else:
# init from env variables
@ -329,8 +339,8 @@ class Accelerator:
else:
# If we're creating a second `Accelerator`, users shouldn't be passing in a `deepspeed_plugin`
if (
PartialState().distributed_type == DistributedType.DEEPSPEED
and AcceleratorState._shared_state != {}
AcceleratorState._shared_state != {}
and AcceleratorState().distributed_type == DistributedType.DEEPSPEED
and AcceleratorState().deepspeed_plugins is not None
):
raise NotImplementedError(
@ -455,27 +465,34 @@ class Accelerator:
# Check for automatic FP8 recipe creation
if self.fp8_enabled and not self.has_fp8_handler:
# Prioritize AO -> TE -> MSAMP
if is_torchao_available():
logger.info("Found `torchao` installed, using it for FP8 training.")
if self.fp8_backend == FP8BackendType.AO:
self.ao_recipe_handler = AORecipeKwargs()
elif is_transformer_engine_available():
logger.info("Found `transformer-engine` installed, using it for FP8 training.")
elif self.fp8_backend == FP8BackendType.TE:
self.te_recipe_handler = TERecipeKwargs()
elif is_msamp_available():
logger.info("Found `msamp` installed, using it for FP8 training.")
elif self.fp8_backend == FP8BackendType.MSAMP:
self.msamp_recipe_handler = MSAMPRecipeKwargs()
else:
raise ImportError(
"Tried to train with `fp8` and auto-detect backend, but no FP8-compatible backend was installed. "
"Valid backends are: `torchao`, `transformer-engine`, and `msamp`."
)
elif self.fp8_backend == FP8BackendType.NO:
# Prioritize AO -> TE -> MSAMP
if is_torchao_available():
logger.info("Found `torchao` installed, using it for FP8 training.")
self.ao_recipe_handler = AORecipeKwargs()
elif is_transformer_engine_available():
logger.info("Found `transformer-engine` installed, using it for FP8 training.")
self.te_recipe_handler = TERecipeKwargs()
elif is_msamp_available():
logger.info("Found `msamp` installed, using it for FP8 training.")
self.msamp_recipe_handler = MSAMPRecipeKwargs()
else:
raise ImportError(
"Tried to train with `fp8` and auto-detect backend, but no FP8-compatible backend was installed. "
"Valid backends are: `torchao`, `transformer-engine`, and `msamp`."
)
self.has_fp8_handler = True
self.delayed_fp8_autocast = False
if self.has_fp8_handler:
# We already check if FP8 is available during `self.state`
if mixed_precision != "fp8" and (
if not self.fp8_enabled and (
self.distributed_type not in (DistributedType.FSDP, DistributedType.DEEPSPEED)
):
raise ValueError("Passing in an FP8 configuration requires setting `mixed_precision='fp8'`.")
@ -485,7 +502,11 @@ class Accelerator:
)
# TODO: S1ro - this is probably gonna be a problem with other fp8 backends too
if self.fp8_backend == "AO" and self.state.fsdp_plugin.cpu_ram_efficient_loading:
if (
self.fp8_backend == FP8BackendType.AO
and self.state.distributed_type == DistributedType.FSDP
and self.state.fsdp_plugin.cpu_ram_efficient_loading
):
raise ValueError(
"torchao with FSDP2 and cpu_ram_efficient_loading is not supported, setting `cpu_ram_efficient_loading` to False will fix the issue and work as intended."
)
@ -512,6 +533,8 @@ class Accelerator:
parse_choice_from_env("ACCELERATE_GRADIENT_ACCUMULATION_STEPS", gradient_accumulation_steps)
)
gradient_accumulation_plugin = GradientAccumulationPlugin(num_steps=gradient_accumulation_steps)
# If using DeepSpeed, update gradient accumulation steps from the DeepSpeed plugin
self.gradient_state = GradientState(
gradient_accumulation_plugin=gradient_accumulation_plugin,
)
@ -567,7 +590,7 @@ class Accelerator:
elif self.fp8_enabled:
# We always enable `native_amp` for FP8
self.native_amp = True
if self.fp8_backend == "MSAMP":
if self.fp8_backend == FP8BackendType.MSAMP:
if self.distributed_type == DistributedType.FSDP:
raise NotImplementedError(
"`accelerate` + `MS-AMP` + `FSDP` is not supported at this time. "
@ -1039,7 +1062,8 @@ class Accelerator:
"""
context = contextlib.nullcontext
if self.use_distributed:
context = getattr(model, "no_sync", context)
if self.distributed_type != DistributedType.DEEPSPEED or self.state.deepspeed_plugin.zero_stage < 2:
context = getattr(model, "no_sync", context)
with context():
yield
@ -1404,11 +1428,18 @@ class Accelerator:
old_named_params = self._get_named_parameters(*args, drop_refs=False)
if self.distributed_type in [DistributedType.MULTI_CPU, DistributedType.MULTI_XPU, DistributedType.NO]:
if (self.device.type == "cpu" or self.device.type == "xpu") and self.state.use_ipex:
if (
is_torch_version("<", "2.7.0")
and (self.device.type == "cpu" or self.device.type == "xpu")
and self.state.use_ipex
):
logger.warning(
"You are using lower version of PyTorch(< 2.7.0) with ipex acceleration on Intel CPU or XPU, Intel has upstreamed most of the optimizations into stock PyTorch from 2.7.0, we enourage you to install the latest stock PyTorch and enjoy the out-of-experience on Intel CPU/XPU."
)
args = self._prepare_ipex(*args)
if self.fp8_backend == "TE":
if self.fp8_backend == FP8BackendType.TE:
args = self._prepare_te(*args)
elif self.fp8_backend == "AO":
elif self.fp8_backend == FP8BackendType.AO:
args = self._prepare_ao(*args)
if self.distributed_type == DistributedType.DEEPSPEED:
result = self._prepare_deepspeed(*args)
@ -1417,7 +1448,7 @@ class Accelerator:
elif self.is_fsdp2:
result = self._prepare_fsdp2(*args)
else:
if self.fp8_backend == "MSAMP":
if self.fp8_backend == FP8BackendType.MSAMP:
args, device_placement = self._prepare_msamp(*args, device_placement=device_placement)
result = tuple(
self._prepare_one(obj, first_pass=True, device_placement=d) for obj, d in zip(args, device_placement)
@ -1557,7 +1588,7 @@ class Accelerator:
model._original_forward = model.forward
autocast_context = get_mixed_precision_context_manager(self.native_amp, self.autocast_handler)
# NOTE: MS-AMP adds `__func__` already to `model.forward`, so we should always use `model.forward`
if self.fp8_backend == "MSAMP" or not hasattr(model.forward, "__func__"):
if self.fp8_backend == FP8BackendType.MSAMP or not hasattr(model.forward, "__func__"):
model_forward_func = model.forward
model.forward = convert_outputs_to_fp32(autocast_context(model_forward_func))
else:
@ -1567,7 +1598,7 @@ class Accelerator:
model.forward = MethodType(convert_outputs_to_fp32(model.forward.__func__), model)
# We prepare TE after, allowing for bf16 autocast to happen first
if self.fp8_backend == "TE" and not self.delayed_fp8_autocast:
if self.fp8_backend == FP8BackendType.TE and not self.delayed_fp8_autocast:
model = apply_fp8_autowrap(model, self.te_recipe_handler or self.fp8_recipe_handler)
if (getattr(model, "is_loaded_in_8bit", False) or getattr(model, "is_loaded_in_4bit", False)) and getattr(
@ -1620,6 +1651,10 @@ class Accelerator:
DistributedType.MULTI_XPU,
DistributedType.MULTI_HPU,
):
if model_has_dtensor(model):
raise ValueError(
"Your model contains `DTensor` parameters, which is incompatible with DDP. Maybe you loaded your model with `device_map='auto'`? Specify `device_map='cuda'` or 'cpu' instead."
)
if any(p.requires_grad for p in model.parameters()):
kwargs = self.ddp_handler.to_kwargs() if self.ddp_handler is not None else {}
# TODO: Look at enabling native TP training directly with a proper config
@ -1789,7 +1824,7 @@ class Accelerator:
elif self.distributed_type == DistributedType.XLA and self.state.fork_launched:
model = xmp.MpModelWrapper(model).to(self.device)
# Now we can apply the FP8 autocast
if self.fp8_backend == "TE" and self.delayed_fp8_autocast:
if self.fp8_backend == FP8BackendType.TE and self.delayed_fp8_autocast:
model = apply_fp8_autowrap(model, self.te_recipe_handler or self.fp8_recipe_handler)
# torch.compile should be called last and only if the model isn't already compiled
if self.state.dynamo_plugin.backend != DynamoBackend.NO and not is_compiled_module(model):
@ -1867,7 +1902,7 @@ class Accelerator:
import deepspeed
ds_initialize = deepspeed.initialize
if self.fp8_backend == "MSAMP":
if self.fp8_backend == FP8BackendType.MSAMP:
# MS-AMP requires DeepSpeed patches
from msamp import deepspeed as msamp_deepspeed
@ -1931,6 +1966,15 @@ class Accelerator:
gradient_accumulation_steps=self.gradient_accumulation_steps,
)
deepspeed_gradient_accumulation_steps = deepspeed_plugin.get_value("gradient_accumulation_steps")
# update gradient_accumulation_steps if there is a mismatch
if deepspeed_gradient_accumulation_steps != self.gradient_accumulation_steps:
logger.warning(
f"Gradient accumulation steps mismatch: GradientAccumulationPlugin has {self.gradient_accumulation_steps}, "
f"DeepSpeed config has {deepspeed_gradient_accumulation_steps}. Using DeepSpeed's value."
)
self.gradient_accumulation_steps = deepspeed_gradient_accumulation_steps
config_kwargs = {
"gradient_clipping": 1.0,
"zero_optimization.stage3_gather_16bit_weights_on_model_save": False,
@ -1996,7 +2040,7 @@ class Accelerator:
if model is not None:
# If we are using FP8, we need to apply the autowrap now
if self.fp8_backend == "TE":
if self.fp8_backend == FP8BackendType.TE:
model = apply_fp8_autowrap(model, self.fp8_recipe_handler)
# if the model is an MOE, set the appropriate MOE layers as leaf Z3 modules
deepspeed_plugin.set_moe_leaf_modules(model)
@ -2453,7 +2497,7 @@ class Accelerator:
device_placement = self.device_placement
# NOTE: Special case with MS-AMP we do *not* pass in the scaler explicitly to the `AcceleratedOptimizer`,
# Their optimizer handles it for us.
scaler = None if self.fp8_backend == "MSAMP" else self.scaler
scaler = None if self.fp8_backend == FP8BackendType.MSAMP else self.scaler
optimizer = AcceleratedOptimizer(optimizer, device_placement=device_placement, scaler=scaler)
self._optimizers.append(optimizer)
return optimizer
@ -2523,7 +2567,7 @@ class Accelerator:
# deepspeed handles loss scaling by gradient_accumulation_steps in its `backward`
loss = loss / self.gradient_accumulation_steps
if self.distributed_type == DistributedType.DEEPSPEED:
self.deepspeed_engine_wrapped.backward(loss, **kwargs)
self.deepspeed_engine_wrapped.backward(loss, sync_gradients=self.sync_gradients, **kwargs)
elif self.distributed_type == DistributedType.MEGATRON_LM:
return
elif self.scaler is not None:
@ -2664,8 +2708,9 @@ class Accelerator:
parameters, max_norm, norm_type=norm_type
) # viz: https://github.com/pytorch/torchtitan/blob/main/docs/fsdp.md
elif self.distributed_type == DistributedType.DEEPSPEED:
# `accelerator.backward(loss)` is doing that automatically. Therefore, its implementation is not needed
# We cannot return the gradient norm because DeepSpeed does it.
# DeepSpeed handles gradient clipping internally, but we can retrieve the gradient norm
if self.deepspeed_engine_wrapped is not None:
return self.deepspeed_engine_wrapped.get_global_grad_norm()
return None
elif self.distributed_type == DistributedType.XLA:
# Reduce gradients first for XLA
@ -3495,6 +3540,21 @@ class Accelerator:
else:
models.append(model)
# We need to load the scaler state before the optimizer for FSDP2
# (`torch.distributed.checkpoint.set_optimizer_state_dict`) which we use to set the state of the optimizer calls `optimizer.step` on
# a dummy tensor, but since the scaler is not initialized, it will raise an error (the scaler exists but its `_scale` is None)
scaler = None
if self.scaler is not None and self.is_fsdp2:
input_scaler_file = os.path.join(input_dir, SCALER_NAME)
scaler_state = torch.load(input_scaler_file)
self.scaler.load_state_dict(scaler_state)
# We also need to call the `_lazy_init_scale_growth_tracker` to initialize the scaler, as it would else be called
# on the first call to scale
self.scaler._lazy_init_scale_growth_tracker(self.scaler._device)
logger.info("GradScaler state loaded successfully")
else:
scaler = self.scaler
# Load the optimizers taking care of FSDP and DeepSpeed nuances
optimizers = []
if self.distributed_type == DistributedType.FSDP:
@ -3543,7 +3603,7 @@ class Accelerator:
schedulers,
dataloaders,
self.state.process_index,
self.scaler,
scaler,
map_location,
load_kwargs,
**load_model_func_kwargs,
@ -3626,7 +3686,7 @@ class Accelerator:
# we need this bit as `WeightWithDynamic...` returns 0 when `data_ptr()` is called,
# the underlying pointer is actually hidden in `_tensor` attribute
if self.fp8_backend == "AO":
if self.fp8_backend == FP8BackendType.AO:
from torchao.float8.fsdp_utils import WeightWithDynamicFloat8CastTensor
accessor_mapping[WeightWithDynamicFloat8CastTensor] = "_tensor"
@ -3713,8 +3773,7 @@ class Accelerator:
elif self.is_fsdp2:
from torch.distributed.checkpoint.state_dict import StateDictOptions, get_model_state_dict
# This hangs if `cpu_offload` is also True
options = StateDictOptions(full_state_dict=True, broadcast_from_rank0=True)
options = StateDictOptions(full_state_dict=True, broadcast_from_rank0=True, cpu_offload=True)
state_dict = get_model_state_dict(model, options=options)
elif self.distributed_type == DistributedType.FSDP:
from torch.distributed.fsdp import FullStateDictConfig, StateDictType
@ -3936,17 +3995,18 @@ class Accelerator:
)
@property
def fp8_backend(self):
def fp8_backend(self) -> FP8BackendType:
"Returns the configured backend for training in FP8"
if self.has_fp8_handler:
if self.fp8_recipe_handler is not None:
return self.fp8_recipe_handler.backend
return FP8BackendType(self.fp8_recipe_handler.backend)
elif self.ao_recipe_handler is not None:
return "AO"
return FP8BackendType.AO
elif self.te_recipe_handler is not None:
return "TE"
return FP8BackendType.TE
elif self.msamp_recipe_handler is not None:
return "MSAMP"
return FP8BackendType.MSAMP
elif self.state.deepspeed_plugin is not None and self.state.deepspeed_plugin.enable_msamp:
return "MSAMP"
return None
return FP8BackendType.MSAMP
return FP8BackendType(parse_choice_from_env("ACCELERATE_FP8_BACKEND", "NO"))

View File

@ -774,8 +774,8 @@ def get_cluster_input():
)
fp8_config["fp8_format"] = _ask_options(
"Which weight format should be used?",
["HYBRID", "E4M3"],
lambda x: "HYBRID" if x == 0 else "E4M3",
["HYBRID", "E4M3", "E5M2"],
lambda i: ["HYBRID", "E4M3", "E5M2"][i],
default=0,
)
fp8_config["amax_history_length"] = _ask_field(

View File

@ -692,8 +692,8 @@ def launch_command_parser(subparsers=None):
fp8_args.add_argument(
"--fp8_format",
type=str,
default="E4M3",
choices=["E4M3", "HYBRID"],
default="HYBRID",
choices=["HYBRID", "E4M3", "E5M2"],
help="The format to use for the FP8 recipe (useful only when `--fp8_backend=te` is passed).",
)
fp8_args.add_argument(
@ -1151,6 +1151,12 @@ def _validate_launch_command(args):
f"\t`--num_cpu_threads_per_process` was set to `{args.num_cpu_threads_per_process}` to improve out-of-box performance when training on CPUs"
)
if args.ipex is not None:
logger.warning(
"ipex flag is deprecated, will be removed in Accelerate v1.10. "
"From 2.7.0, PyTorch has all needed optimizations for Intel CPU and XPU."
)
if args.use_xpu is not None:
logger.warning(
"use_xpu is deprecated and ignored, will be removed in Accelerate v1.20. "

View File

@ -32,6 +32,7 @@ from .utils import (
find_batch_size,
get_data_structure,
initialize_tensors,
is_datasets_available,
is_torch_version,
is_torchdata_stateful_dataloader_available,
send_to_device,
@ -565,7 +566,8 @@ class DataLoaderShard(DataLoaderAdapter, DataLoaderStateMixin):
try:
current_batch = next(dataloader_iter)
except StopIteration:
yield
self.end()
return
batch_index = 0
while True:
@ -1194,7 +1196,16 @@ def prepare_data_loader(
dataloader.sampler.generator = generator
# No change if no multiprocess
if (num_processes != 1 or state.distributed_type == DistributedType.MEGATRON_LM) and not dispatch_batches:
if isinstance(new_dataset, IterableDataset):
if is_datasets_available():
from datasets import IterableDataset as DatasetsIterableDataset
if (
is_datasets_available()
and isinstance(new_dataset, DatasetsIterableDataset)
and not split_batches
and new_dataset.n_shards > num_processes
):
new_dataset = new_dataset.shard(num_shards=num_processes, index=process_index)
elif isinstance(new_dataset, IterableDataset):
if getattr(dataloader.dataset, "generator", None) is not None:
synchronized_generator = dataloader.dataset.generator
new_dataset = IterableDatasetShard(

View File

@ -60,8 +60,8 @@ def notebook_launcher(
<Tip warning={true}>
To use this function absolutely zero calls to a CUDA device must be made in the notebook session before calling. If
any have been made, you will need to restart the notebook and make sure no cells use any CUDA capability.
To use this function absolutely zero calls to a device must be made in the notebook session before calling. If any
have been made, you will need to restart the notebook and make sure no cells use any device capability.
Setting `ACCELERATE_DEBUG_MODE="1"` in your environment will run a test before truly launching to ensure that none
of those calls have been made.
@ -76,11 +76,11 @@ def notebook_launcher(
Tuple of arguments to pass to the function (it will receive `*args`).
num_processes (`int`, *optional*):
The number of processes to use for training. Will default to 8 in Colab/Kaggle if a TPU is available, to
the number of GPUs available otherwise.
the number of devices available otherwise.
mixed_precision (`str`, *optional*, defaults to `"no"`):
If `fp16` or `bf16`, will use mixed precision training on multi-GPU.
If `fp16` or `bf16`, will use mixed precision training on multi-device.
use_port (`str`, *optional*, defaults to `"29500"`):
The port to use to communicate between processes when launching a multi-GPU training.
The port to use to communicate between processes when launching a multi-device training.
master_addr (`str`, *optional*, defaults to `"127.0.0.1"`):
The address to use for communication between processes.
node_rank (`int`, *optional*, defaults to 0):
@ -105,7 +105,7 @@ def notebook_launcher(
Example:
```python
# Assume this is defined in a Jupyter Notebook on an instance with two GPUs
# Assume this is defined in a Jupyter Notebook on an instance with two devices
from accelerate import notebook_launcher
@ -158,27 +158,27 @@ def notebook_launcher(
else:
if num_processes is None:
raise ValueError(
"You have to specify the number of GPUs you would like to use, add `num_processes=...` to your call."
"You have to specify the number of devices you would like to use, add `num_processes=...` to your call."
)
if node_rank >= num_nodes:
raise ValueError("The node_rank must be less than the number of nodes.")
if num_processes > 1:
# Multi-GPU launch
# Multi-device launch
from torch.distributed.launcher.api import LaunchConfig, elastic_launch
from torch.multiprocessing import start_processes
from torch.multiprocessing.spawn import ProcessRaisedException
if len(AcceleratorState._shared_state) > 0:
raise ValueError(
"To launch a multi-GPU training from your notebook, the `Accelerator` should only be initialized "
"To launch a multi-device training from your notebook, the `Accelerator` should only be initialized "
"inside your training function. Restart your notebook and make sure no cells initializes an "
"`Accelerator`."
)
# Check for specific libraries known to initialize CUDA that users constantly use
# Check for specific libraries known to initialize device that users constantly use
problematic_imports = are_libraries_initialized("bitsandbytes")
if len(problematic_imports) > 0:
err = (
"Could not start distributed process. Libraries known to initialize CUDA upon import have been "
"Could not start distributed process. Libraries known to initialize device upon import have been "
"imported already. Please keep these imports inside your training function to try and help with this:"
)
for lib_name in problematic_imports:
@ -203,24 +203,26 @@ def notebook_launcher(
# process here (the other ones will be set be the launcher).
with patch_environment(**patched_env):
# First dummy launch
device_type = torch.accelerator.current_accelerator().type if hasattr(torch, "accelerator") else "cuda"
distributed_type = "MULTI_XPU" if device_type == "xpu" else "MULTI_GPU"
if os.environ.get("ACCELERATE_DEBUG_MODE", "false").lower() == "true":
launcher = PrepareForLaunch(test_launch, distributed_type="MULTI_GPU")
launcher = PrepareForLaunch(test_launch, distributed_type=distributed_type)
try:
start_processes(launcher, args=(), nprocs=num_processes, start_method="fork")
except ProcessRaisedException as e:
err = "An issue was found when verifying a stable environment for the notebook launcher."
if "Cannot re-initialize CUDA in forked subprocess" in e.args[0]:
if f"Cannot re-initialize {device_type.upper()} in forked subprocess" in e.args[0]:
raise RuntimeError(
f"{err}"
"This likely stems from an outside import causing issues once the `notebook_launcher()` is called. "
"Please review your imports and test them when running the `notebook_launcher()` to identify "
"which one is problematic and causing CUDA to be initialized."
f"which one is problematic and causing {device_type.upper()} to be initialized."
) from e
else:
raise RuntimeError(f"{err} The following error was raised: {e}") from e
# Now the actual launch
launcher = PrepareForLaunch(function, distributed_type="MULTI_GPU")
print(f"Launching training on {num_processes} GPUs.")
launcher = PrepareForLaunch(function, distributed_type=distributed_type)
print(f"Launching training on {num_processes} {device_type.upper()}s.")
try:
if rdzv_conf is None:
rdzv_conf = {}
@ -244,23 +246,25 @@ def notebook_launcher(
launch_config_kwargs["log_line_prefix_template"] = log_line_prefix_template
elastic_launch(config=LaunchConfig(**launch_config_kwargs), entrypoint=function)(*args)
except ProcessRaisedException as e:
if "Cannot re-initialize CUDA in forked subprocess" in e.args[0]:
if f"Cannot re-initialize {device_type.upper()} in forked subprocess" in e.args[0]:
raise RuntimeError(
"CUDA has been initialized before the `notebook_launcher` could create a forked subprocess. "
f"{device_type.upper()} has been initialized before the `notebook_launcher` could create a forked subprocess. "
"This likely stems from an outside import causing issues once the `notebook_launcher()` is called. "
"Please review your imports and test them when running the `notebook_launcher()` to identify "
"which one is problematic and causing CUDA to be initialized."
f"which one is problematic and causing {device_type.upper()} to be initialized."
) from e
else:
raise RuntimeError(f"An issue was found when launching the training: {e}") from e
else:
# No need for a distributed launch otherwise as it's either CPU, GPU or MPS.
# No need for a distributed launch otherwise as it's either CPU, GPU, XPU or MPS.
if is_mps_available():
os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1"
print("Launching training on MPS.")
elif torch.cuda.is_available():
print("Launching training on one GPU.")
elif torch.xpu.is_available():
print("Launching training on one XPU.")
else:
print("Launching training on CPU.")
function(*args)

View File

@ -132,7 +132,7 @@ class PartialState:
Whether or not to force the script to execute on CPU. Will ignore any accelerators available if set to
`True` and force the execution on the CPU.
kwargs (additional keyword arguments, *optional*):
Additional keyword arguments to pass to the relevent `init_process_group` function. Valid `kwargs` can be
Additional keyword arguments to pass to the relevant `init_process_group` function. Valid `kwargs` can be
found in [`utils.InitProcessGroupKwargs`]. See the example section for detailed usage.
**Available attributes:**
@ -213,12 +213,6 @@ class PartialState:
if self.backend == "tccl":
local_rank = os.environ.get("LOCAL_RANK", -1)
torch.sdaa.set_device(f"sdaa:{local_rank}")
if (
self.backend == "nccl"
and os.environ.get("ACCELERATE_USE_FSDP", "false") == "true"
and os.environ.get("FSDP_OFFLOAD_PARAMS", "false") == "true"
):
self.backend = "cuda:nccl,cpu:gloo"
dist.init_distributed(dist_backend=self.backend, auto_mpi_discovery=False, **kwargs)
# We need to flag to `use_deepspeed` to be True to override `distributed_type` later
use_deepspeed = True
@ -230,6 +224,15 @@ class PartialState:
if self.backend == "tccl":
local_rank = os.environ.get("LOCAL_RANK", -1)
torch.sdaa.set_device(f"sdaa:{local_rank}")
if (
self.backend == "nccl"
and os.environ.get("ACCELERATE_USE_FSDP", "false") == "true"
and (
os.environ.get("FSDP_OFFLOAD_PARAMS", "false") == "true"
or os.environ.get("FSDP_STATE_DICT_TYPE", "SHARDED_STATE_DICT") == "FULL_STATE_DICT"
)
):
self.backend = "cuda:nccl,cpu:gloo"
torch.distributed.init_process_group(backend=self.backend, **kwargs)
# XPU and CPU require special env configs to be set

View File

@ -79,10 +79,6 @@ def mock_training(accelerator, model):
def check_weights(operation, state_1, state_2):
for weight_1, weight_2 in zip(state_1.values(), state_2.values()):
if str(weight_1.device) != torch_device:
weight_1 = weight_1.to(torch_device)
if str(weight_2.device) != torch_device:
weight_2 = weight_2.to(torch_device)
if operation == "same":
assert torch.allclose(weight_1, weight_2)
else:
@ -91,7 +87,7 @@ def check_weights(operation, state_1, state_2):
def check_safetensors_weights(path, model):
safe_state_dict = load_file(path / "model.safetensors")
safe_loaded_model = TinyModel()
safe_loaded_model = TinyModel().to(torch_device)
check_weights("diff", model.state_dict(), safe_loaded_model.state_dict())
safe_loaded_model.load_state_dict(safe_state_dict)
check_weights("same", model.state_dict(), safe_loaded_model.state_dict())
@ -99,7 +95,7 @@ def check_safetensors_weights(path, model):
def check_pytorch_weights(path, model):
nonsafe_state_dict = torch.load(path / "pytorch_model.bin", weights_only=True)
nonsafe_loaded_model = TinyModel()
nonsafe_loaded_model = TinyModel().to(torch_device)
check_weights("diff", model.state_dict(), nonsafe_loaded_model.state_dict())
nonsafe_loaded_model.load_state_dict(nonsafe_state_dict)
check_weights("same", model.state_dict(), nonsafe_loaded_model.state_dict())

View File

@ -625,7 +625,7 @@ def training_check(use_seedable_sampler=False):
msg=lambda msg: f"Did not obtain the same model on CPU or distributed training.\n{msg}",
)
# IPEX support is only for CPU
# IPEX CPU tests
if is_ipex_available():
print("ipex BF16 training check.")
AcceleratorState._reset_state()

View File

@ -61,6 +61,7 @@ from ..utils import (
is_pytest_available,
is_schedulefree_available,
is_sdaa_available,
is_swanlab_available,
is_tensorboard_available,
is_timm_available,
is_torch_version,
@ -68,6 +69,7 @@ from ..utils import (
is_torchao_available,
is_torchdata_stateful_dataloader_available,
is_torchvision_available,
is_trackio_available,
is_transformer_engine_available,
is_transformers_available,
is_triton_available,
@ -249,6 +251,10 @@ def require_fp8(test_case):
return unittest.skipUnless(fp8_is_available, "test requires FP8 support")(test_case)
def require_fsdp2(test_case):
return unittest.skipUnless(is_torch_version(">=", "2.5.0"), "test requires FSDP2 (torch >= 2.5.0)")(test_case)
def require_mlu(test_case):
"""
Decorator marking a test that requires MLU. These tests are skipped when there are no MLU available.
@ -454,6 +460,13 @@ def require_wandb(test_case):
return unittest.skipUnless(is_wandb_available(), "test requires wandb")(test_case)
def require_trackio(test_case):
"""
Decorator marking a test that requires trackio installed. These tests are skipped when trackio isn't installed
"""
return unittest.skipUnless(is_trackio_available(), "test requires trackio")(test_case)
def require_comet_ml(test_case):
"""
Decorator marking a test that requires comet_ml installed. These tests are skipped when comet_ml isn't installed
@ -482,6 +495,13 @@ def require_dvclive(test_case):
return unittest.skipUnless(is_dvclive_available(), "test requires dvclive")(test_case)
def require_swanlab(test_case):
"""
Decorator marking a test that requires swanlab installed. These tests are skipped when swanlab isn't installed
"""
return unittest.skipUnless(is_swanlab_available(), "test requires swanlab")(test_case)
def require_pandas(test_case):
"""
Decorator marking a test that requires pandas installed. These tests are skipped when pandas isn't installed
@ -536,7 +556,8 @@ def require_matplotlib(test_case):
_atleast_one_tracker_available = (
any([is_wandb_available(), is_tensorboard_available()]) and not is_comet_ml_available()
any([is_wandb_available(), is_tensorboard_available(), is_trackio_available(), is_swanlab_available()])
and not is_comet_ml_available()
)

View File

@ -34,7 +34,9 @@ from .utils import (
is_comet_ml_available,
is_dvclive_available,
is_mlflow_available,
is_swanlab_available,
is_tensorboard_available,
is_trackio_available,
is_wandb_available,
listify,
)
@ -63,6 +65,12 @@ if is_clearml_available():
if is_dvclive_available():
_available_trackers.append(LoggerType.DVCLIVE)
if is_swanlab_available():
_available_trackers.append(LoggerType.SWANLAB)
if is_trackio_available():
_available_trackers.append(LoggerType.TRACKIO)
logger = get_logger(__name__)
@ -133,7 +141,7 @@ class GeneralTracker:
def start(self):
"""
Lazy initialization of the tracker inside Accelerator to avoid initalizing PartialState before
Lazy initialization of the tracker inside Accelerator to avoid initializing PartialState before
InitProcessGroupKwargs.
"""
pass
@ -332,7 +340,16 @@ class WandBTracker(GeneralTracker):
"""
import wandb
wandb.config.update(values, allow_val_change=True)
if os.environ.get("WANDB_MODE") == "offline":
# In offline mode, restart wandb with config included
if hasattr(self, "run") and self.run:
self.run.finish()
init_kwargs = self.init_kwargs.copy()
init_kwargs["config"] = values
self.run = wandb.init(project=self.run_name, **init_kwargs)
else:
wandb.config.update(values, allow_val_change=True)
logger.debug("Stored initial configuration hyperparameters to WandB")
@on_main_process
@ -411,6 +428,83 @@ class WandBTracker(GeneralTracker):
logger.debug("WandB run closed")
class TrackioTracker(GeneralTracker):
"""
A `Tracker` class that supports `trackio`. Should be initialized at the start of your script.
Args:
run_name (`str`):
The name of the experiment run. Will be used as the `project` name when instantiating trackio.
**kwargs (additional keyword arguments, *optional*):
Additional key word arguments passed along to the `trackio.init` method. Refer to this
[init](https://github.com/gradio-app/trackio/blob/814809552310468b13f84f33764f1369b4e5136c/trackio/__init__.py#L22)
to see all supported key word arguments.
"""
name = "trackio"
requires_logging_directory = False
main_process_only = False
def __init__(self, run_name: str, **kwargs):
super().__init__()
self.run_name = run_name
self.init_kwargs = kwargs
@on_main_process
def start(self):
import trackio
self.run = trackio.init(project=self.run_name, **self.init_kwargs)
logger.debug(f"Initialized trackio project {self.run_name}")
logger.debug(
"Make sure to log any initial configurations with `self.store_init_configuration` before training!"
)
@property
def tracker(self):
return self.run
@on_main_process
def store_init_configuration(self, values: dict):
"""
Logs `values` as hyperparameters for the run. Should be run at the beginning of your experiment.
Args:
values (Dictionary `str` to `bool`, `str`, `float` or `int`):
Values to be stored as initial hyperparameters as key-value pairs. The values need to have type `bool`,
`str`, `float`, `int`, or `None`.
"""
import trackio
trackio.config.update(values, allow_val_change=True)
logger.debug("Stored initial configuration hyperparameters to trackio")
@on_main_process
def log(self, values: dict, step: Optional[int] = None, **kwargs):
"""
Logs `values` to the current run.
Args:
values (Dictionary `str` to `str`, `float`, `int` or `dict` of `str` to `float`/`int`):
Values to be logged as key-value pairs. The values need to have type `str`, `float`, `int` or `dict` of
`str` to `float`/`int`.
step (`int`, *optional*):
The run step. If included, the log will be affiliated with this step.
kwargs:
Additional key word arguments passed along to the `trackio.log` method.
"""
self.run.log(values, **kwargs)
logger.debug("Successfully logged to trackio")
@on_main_process
def finish(self):
"""
Closes `trackio` run
"""
self.run.finish()
logger.debug("trackio run closed")
class CometMLTracker(GeneralTracker):
"""
A `Tracker` class that supports `comet_ml`. Should be initialized at the start of your script.
@ -1061,6 +1155,106 @@ class DVCLiveTracker(GeneralTracker):
self.live.end()
class SwanLabTracker(GeneralTracker):
"""
A `Tracker` class that supports `swanlab`. Should be initialized at the start of your script.
Args:
run_name (`str`):
The name of the experiment run.
**kwargs (additional keyword arguments, *optional*):
Additional key word arguments passed along to the `swanlab.init` method.
"""
name = "swanlab"
requires_logging_directory = False
main_process_only = False
def __init__(self, run_name: str, **kwargs):
super().__init__()
self.run_name = run_name
self.init_kwargs = kwargs
@on_main_process
def start(self):
import swanlab
self.run = swanlab.init(project=self.run_name, **self.init_kwargs)
swanlab.config["FRAMEWORK"] = "🤗Accelerate" # add accelerate logo in config
logger.debug(f"Initialized SwanLab project {self.run_name}")
logger.debug(
"Make sure to log any initial configurations with `self.store_init_configuration` before training!"
)
@property
def tracker(self):
return self.run
@on_main_process
def store_init_configuration(self, values: dict):
"""
Logs `values` as hyperparameters for the run. Should be run at the beginning of your experiment.
Args:
values (Dictionary `str` to `bool`, `str`, `float` or `int`):
Values to be stored as initial hyperparameters as key-value pairs. The values need to have type `bool`,
`str`, `float`, `int`, or `None`.
"""
import swanlab
swanlab.config.update(values, allow_val_change=True)
logger.debug("Stored initial configuration hyperparameters to SwanLab")
@on_main_process
def log(self, values: dict, step: Optional[int] = None, **kwargs):
"""
Logs `values` to the current run.
Args:
data : Dict[str, DataType]
Data must be a dict. The key must be a string with 0-9, a-z, A-Z, " ", "_", "-", "/". The value must be a
`float`, `float convertible object`, `int` or `swanlab.data.BaseType`.
step : int, optional
The step number of the current data, if not provided, it will be automatically incremented.
If step is duplicated, the data will be ignored.
kwargs:
Additional key word arguments passed along to the `swanlab.log` method. Likes:
print_to_console : bool, optional
Whether to print the data to the console, the default is False.
"""
self.run.log(values, step=step, **kwargs)
logger.debug("Successfully logged to SwanLab")
@on_main_process
def log_images(self, values: dict, step: Optional[int] = None, **kwargs):
"""
Logs `images` to the current run.
Args:
values (Dictionary `str` to `List` of `np.ndarray` or `PIL.Image`):
Values to be logged as key-value pairs. The values need to have type `List` of `np.ndarray` or
step (`int`, *optional*):
The run step. If included, the log will be affiliated with this step.
kwargs:
Additional key word arguments passed along to the `swanlab.log` method. Likes:
print_to_console : bool, optional
Whether to print the data to the console, the default is False.
"""
import swanlab
for k, v in values.items():
self.log({k: [swanlab.Image(image) for image in v]}, step=step, **kwargs)
logger.debug("Successfully logged images to SwanLab")
@on_main_process
def finish(self):
"""
Closes `swanlab` writer
"""
self.run.finish()
logger.debug("SwanLab run closed")
LOGGER_TYPE_TO_CLASS = {
"aim": AimTracker,
"comet_ml": CometMLTracker,
@ -1069,6 +1263,8 @@ LOGGER_TYPE_TO_CLASS = {
"wandb": WandBTracker,
"clearml": ClearMLTracker,
"dvclive": DVCLiveTracker,
"swanlab": SwanLabTracker,
"trackio": TrackioTracker,
}
@ -1090,9 +1286,12 @@ def filter_trackers(
- `"all"`
- `"tensorboard"`
- `"wandb"`
- `"trackio"`
- `"aim"`
- `"comet_ml"`
- `"mlflow"`
- `"dvclive"`
- `"swanlab"`
If `"all"` is selected, will pick up all available trackers in the environment and initialize them. Can
also accept implementations of `GeneralTracker` for custom trackers, and can be combined with `"all"`.
logging_dir (`str`, `os.PathLike`, *optional*):

View File

@ -121,6 +121,7 @@ from .imports import (
is_sagemaker_available,
is_schedulefree_available,
is_sdaa_available,
is_swanlab_available,
is_tensorboard_available,
is_timm_available,
is_torch_xla_available,
@ -128,6 +129,7 @@ from .imports import (
is_torchdata_available,
is_torchdata_stateful_dataloader_available,
is_torchvision_available,
is_trackio_available,
is_transformer_engine_available,
is_transformers_available,
is_triton_available,
@ -281,6 +283,7 @@ from .other import (
is_port_in_use,
load,
merge_dicts,
model_has_dtensor,
recursive_getattr,
save,
wait_for_everyone,

View File

@ -289,7 +289,7 @@ class InitProcessGroupKwargs(KwargsHandler):
# Literals
Backend = Literal["MSAMP", "TE"]
OptLevel = Literal["O1", "O2"]
FP8Format = Literal["E4M3", "HYBRID"]
FP8Format = Literal["HYBRID", "E4M3", "E5M2"]
AmaxComputeAlgorithm = Literal["max", "most_recent"]
@ -342,8 +342,8 @@ class TERecipeKwargs(KwargsHandler):
interval (`int`, *optional*, default to 1):
The interval to use for how often the scaling factor is recomputed.
fp8_format (`str`, *optional*, default to "HYBRID"):
The format to use for the FP8 recipe. Must be one of `HYBRID` or `E4M3`. (Generally `HYBRID` for training,
`E4M3` for evaluation)
The format to use for the FP8 recipe. Must be one of `HYBRID`, `E4M3` or `E5M2`. (Generally `HYBRID` for
training, `E4M3` or `E5M2` for evaluation)
amax_history_len (`int`, *optional*, default to 1024):
The length of the history to use for the scaling factor computation
amax_compute_algo (`str`, *optional*, default to "most_recent"):
@ -616,8 +616,10 @@ class FP8BackendType(str, enum.Enum):
"""
# Subclassing str as well as Enum allows the `FP8BackendType` to be JSON-serializable out of the box.
NO = "NO"
TE = "TE"
MSAMP = "MSAMP"
AO = "AO"
class ComputeEnvironment(str, enum.Enum):
@ -699,18 +701,24 @@ class LoggerType(BaseEnum):
- **ALL** -- all available trackers in the environment that are supported
- **TENSORBOARD** -- TensorBoard as an experiment tracker
- **WANDB** -- wandb as an experiment tracker
- **TRACKIO** -- trackio as an experiment tracker
- **COMETML** -- comet_ml as an experiment tracker
- **MLFLOW** -- mlflow as an experiment tracker
- **CLEARML** -- clearml as an experiment tracker
- **DVCLIVE** -- dvclive as an experiment tracker
- **SWANLAB** -- swanlab as an experiment tracker
"""
ALL = "all"
AIM = "aim"
TENSORBOARD = "tensorboard"
WANDB = "wandb"
TRACKIO = "trackio"
COMETML = "comet_ml"
MLFLOW = "mlflow"
CLEARML = "clearml"
DVCLIVE = "dvclive"
SWANLAB = "swanlab"
class PrecisionType(str, BaseEnum):
@ -2088,6 +2096,8 @@ class TorchTensorParallelPlugin:
# support for other devices has to be investigated
if is_hpu_available(init_hccl=True):
device = "hpu"
elif is_xpu_available():
device = "xpu"
else:
device = "cuda"

View File

@ -261,22 +261,36 @@ class DeepSpeedEngineWrapper:
def __init__(self, engine):
self.engine = engine
def backward(self, loss, **kwargs):
def backward(self, loss, sync_gradients=True, **kwargs):
# Set gradient accumulation boundary based on Accelerate's sync_gradients state
# This tells DeepSpeed whether this is the final micro-batch before gradient sync
self.engine.set_gradient_accumulation_boundary(is_boundary=sync_gradients)
# runs backpropagation and handles mixed precision
self.engine.backward(loss, **kwargs)
# Deepspeed's `engine.step` performs the following operations:
# - gradient accumulation check
# - gradient clipping
# - optimizer step
# - zero grad
# - checking overflow
# - lr_scheduler step (only if engine.lr_scheduler is not None)
self.engine.step()
# Only perform step and related operations at gradient accumulation boundaries
if sync_gradients:
# Deepspeed's `engine.step` performs the following operations:
# - gradient accumulation check
# - gradient clipping
# - optimizer step
# - zero grad
# - checking overflow
# - lr_scheduler step (only if engine.lr_scheduler is not None)
self.engine.step()
# and this plugin overrides the above calls with no-ops when Accelerate runs under
# Deepspeed, but allows normal functionality for non-Deepspeed cases thus enabling a simple
# training loop that works transparently under many training regimes.
def get_global_grad_norm(self):
"""Get the global gradient norm from DeepSpeed engine."""
grad_norm = self.engine.get_global_grad_norm()
# Convert to scalar if it's a tensor
if hasattr(grad_norm, "item"):
return grad_norm.item()
return grad_norm
class DeepSpeedOptimizerWrapper(AcceleratedOptimizer):
"""

View File

@ -179,10 +179,9 @@ def load_fsdp_model(fsdp_plugin, accelerator, model, input_dir, model_index=0, a
else nullcontext()
)
sd_options = _prepare_sd_options(fsdp_plugin)
with ctx:
if fsdp_plugin.state_dict_type == StateDictType.FULL_STATE_DICT:
if type(model) is not FSDP and accelerator.process_index != 0:
if type(model) is not FSDP and accelerator.process_index != 0 and not accelerator.is_fsdp2:
if not fsdp_plugin.sync_module_states and fsdp_plugin.fsdp_version == 1:
raise ValueError(
"Set the `sync_module_states` flag to `True` so that model states are synced across processes when "
@ -192,7 +191,12 @@ def load_fsdp_model(fsdp_plugin, accelerator, model, input_dir, model_index=0, a
weights_name = f"{FSDP_MODEL_NAME}.bin" if model_index == 0 else f"{FSDP_MODEL_NAME}_{model_index}.bin"
input_model_file = os.path.join(input_dir, weights_name)
logger.info(f"Loading model from {input_model_file}")
state_dict = torch.load(input_model_file, weights_only=True)
# we want an empty state dict for FSDP2 as we use `broadcast_from_rank0`
load_model = not accelerator.is_fsdp2 or accelerator.is_main_process
if load_model:
state_dict = torch.load(input_model_file, weights_only=True)
else:
state_dict = {}
logger.info(f"Model loaded from {input_model_file}")
elif fsdp_plugin.state_dict_type == StateDictType.LOCAL_STATE_DICT:
weights_name = (
@ -498,10 +502,10 @@ def fsdp2_load_full_state_dict(accelerator, model: torch.nn.Module, full_sd: dic
if accelerator.is_main_process:
for (param_name, full_param), sharded_param in zip(full_sd.items(), meta_sharded_sd.values()):
full_param = full_param.detach().cuda()
mesh = sharded_param.device_mesh
dist.broadcast(full_param, src=0, group=mesh.get_group())
sharded_tensor = distribute_tensor(full_param, mesh, sharded_param.placements)
device_mesh = sharded_param.device_mesh
full_param = full_param.detach().to(device_mesh.device_type)
dist.broadcast(full_param, src=0, group=device_mesh.get_group())
sharded_tensor = distribute_tensor(full_param, device_mesh, sharded_param.placements)
to_contiguous, casting_dtype = _infer_parameter_dtype(
model,
param_name,
@ -512,10 +516,10 @@ def fsdp2_load_full_state_dict(accelerator, model: torch.nn.Module, full_sd: dic
# We need this else to have a matching `broadcast` for all of the ranks, else we deadlock
else:
for param_name, sharded_param in meta_sharded_sd.items():
full_tensor = torch.empty(sharded_param.size(), device="cuda", dtype=sharded_param.dtype)
mesh = sharded_param.device_mesh
dist.broadcast(full_tensor, src=0, group=mesh.get_group())
sharded_tensor = distribute_tensor(full_tensor, mesh, sharded_param.placements)
device_mesh = sharded_param.device_mesh
full_tensor = torch.empty(sharded_param.size(), device=device_mesh.device_type, dtype=sharded_param.dtype)
dist.broadcast(full_tensor, src=0, group=device_mesh.get_group())
sharded_tensor = distribute_tensor(full_tensor, device_mesh, sharded_param.placements)
to_contiguous, casting_dtype = _infer_parameter_dtype(
model,
param_name,

View File

@ -15,6 +15,7 @@
import importlib
import importlib.metadata
import os
import sys
import warnings
from functools import lru_cache, wraps
@ -281,6 +282,14 @@ def is_comet_ml_available():
return _is_package_available("comet_ml")
def is_swanlab_available():
return _is_package_available("swanlab")
def is_trackio_available():
return sys.version_info >= (3, 10) and _is_package_available("trackio")
def is_boto3_available():
return _is_package_available("boto3")

View File

@ -89,9 +89,9 @@ def setup_fp8_env(args: argparse.Namespace, current_env: dict[str, str]):
value = getattr(args, arg)
if value is not None:
if arg == "fp8_override_linear_precision":
current_env[prefix + "FP8_OVERRIDE_FPROP"] = value[0]
current_env[prefix + "FP8_OVERRIDE_DGRAD"] = value[1]
current_env[prefix + "FP8_OVERRIDE_WGRAD"] = value[2]
current_env[prefix + "FP8_OVERRIDE_FPROP"] = str(value[0])
current_env[prefix + "FP8_OVERRIDE_DGRAD"] = str(value[1])
current_env[prefix + "FP8_OVERRIDE_WGRAD"] = str(value[2])
else:
current_env[f"{prefix}{arg.upper()}"] = str(getattr(args, arg))
return current_env

View File

@ -121,7 +121,7 @@ def find_executable_batch_size(
):
"""
A basic decorator that will try to execute `function`. If it fails from exceptions related to out-of-memory or
CUDNN, the batch size is cut in half and passed to `function`
CUDNN, the batch size is multiplied by 0.9 and passed to `function`
`function` must take in a `batch_size` parameter as its first argument.
@ -153,7 +153,7 @@ def find_executable_batch_size(
def reduce_batch_size_fn():
nonlocal batch_size
batch_size = batch_size // 2
batch_size = int(batch_size * 0.9)
return batch_size
def decorator(*args, **kwargs):

View File

@ -222,6 +222,8 @@ def set_module_tensor_to_device(
dtype: Optional[Union[str, torch.dtype]] = None,
fp16_statistics: Optional[torch.HalfTensor] = None,
tied_params_map: Optional[dict[int, dict[torch.device, torch.Tensor]]] = None,
non_blocking: bool = False,
clear_cache: bool = True,
):
"""
A helper function to set a given tensor (parameter of buffer) of a module on a specific device (note that doing
@ -245,6 +247,10 @@ def set_module_tensor_to_device(
A map of current data pointers to dictionaries of devices to already dispatched tied weights. For a given
execution device, this parameter is useful to reuse the first available pointer of a shared weight on the
device for all others, instead of duplicating memory.
non_blocking (`bool`, *optional*, defaults to `False`):
If `True`, the device transfer will be asynchronous with respect to the host, if possible.
clear_cache (`bool`, *optional*, defaults to `True`):
Whether or not to clear the device cache after setting the tensor on the device.
"""
# Recurse if needed
if "." in tensor_name:
@ -295,9 +301,9 @@ def set_module_tensor_to_device(
if dtype is None:
# For compatibility with PyTorch load_state_dict which converts state dict dtype to existing dtype in model
value = value.to(old_value.dtype)
value = value.to(old_value.dtype, non_blocking=non_blocking)
elif not str(value.dtype).startswith(("torch.uint", "torch.int", "torch.bool")):
value = value.to(dtype)
value = value.to(dtype, non_blocking=non_blocking)
device_quantization = None
with torch.no_grad():
@ -326,15 +332,15 @@ def set_module_tensor_to_device(
if "xpu" in str(device) and not is_xpu_available():
raise ValueError(f'{device} is not available, you should use device="cpu" instead')
if value is None:
new_value = old_value.to(device)
new_value = old_value.to(device, non_blocking=non_blocking)
if dtype is not None and device in ["meta", torch.device("meta")]:
if not str(old_value.dtype).startswith(("torch.uint", "torch.int", "torch.bool")):
new_value = new_value.to(dtype)
new_value = new_value.to(dtype, non_blocking=non_blocking)
if not is_buffer:
module._parameters[tensor_name] = param_cls(new_value, requires_grad=old_value.requires_grad)
elif isinstance(value, torch.Tensor):
new_value = value.to(device)
new_value = value.to(device, non_blocking=non_blocking)
else:
new_value = torch.tensor(value, device=device)
if device_quantization is not None:
@ -347,24 +353,30 @@ def set_module_tensor_to_device(
if param_cls.__name__ in ["Int8Params", "FP4Params", "Params4bit"]:
if param_cls.__name__ == "Int8Params" and new_value.dtype == torch.float32:
# downcast to fp16 if any - needed for 8bit serialization
new_value = new_value.to(torch.float16)
new_value = new_value.to(torch.float16, non_blocking=non_blocking)
# quantize module that are going to stay on the cpu so that we offload quantized weights
if device == "cpu" and param_cls.__name__ == "Int8Params":
new_value = param_cls(new_value, requires_grad=old_value.requires_grad, **kwargs).to(0).to("cpu")
new_value.CB = new_value.CB.to("cpu")
new_value.SCB = new_value.SCB.to("cpu")
else:
new_value = param_cls(new_value, requires_grad=old_value.requires_grad, **kwargs).to(device)
new_value = param_cls(new_value, requires_grad=old_value.requires_grad, **kwargs).to(
device, non_blocking=non_blocking
)
elif param_cls.__name__ in ["QTensor", "QBitsTensor"]:
new_value = torch.nn.Parameter(new_value, requires_grad=old_value.requires_grad).to(device)
new_value = torch.nn.Parameter(new_value, requires_grad=old_value.requires_grad).to(
device, non_blocking=non_blocking
)
elif param_cls.__name__ in ["AffineQuantizedTensor"]:
new_value = new_value.to(device)
new_value = new_value.to(device, non_blocking=non_blocking)
else:
new_value = param_cls(new_value, requires_grad=old_value.requires_grad).to(device)
new_value = param_cls(new_value, requires_grad=old_value.requires_grad).to(
device, non_blocking=non_blocking
)
module._parameters[tensor_name] = new_value
if fp16_statistics is not None:
module._parameters[tensor_name].SCB = fp16_statistics.to(device)
module._parameters[tensor_name].SCB = fp16_statistics.to(device, non_blocking=non_blocking)
del fp16_statistics
# as we put the weight to meta, it doesn't have SCB attr anymore. make sure that it is not a meta weight
if (
@ -390,8 +402,9 @@ def set_module_tensor_to_device(
device_index = torch.device(device).index if torch.device(device).type == "cuda" else None
if not getattr(module.weight, "quant_state", None) and device_index is not None:
module.weight = module.weight.cuda(device_index)
# clean pre and post forward hook
if device != "cpu":
if clear_cache and device != "cpu":
clear_device_cache()
# When handling tied weights, we update tied_params_map to keep track of the tied weights that have already been allocated on the device in
@ -1594,6 +1607,14 @@ def check_device_map(model: nn.Module, device_map: dict[str, Union[int, str, tor
model (`torch.nn.Module`): The model to check the device map against.
device_map (`Dict[str, Union[int, str, torch.device]]`): The device map to check.
"""
all_module_names = dict(model.named_modules())
invalid_keys = [k for k in device_map if k != "" and k not in all_module_names]
if invalid_keys:
warnings.warn(
f"The following device_map keys do not match any submodules in the model: {invalid_keys}", UserWarning
)
all_model_tensors = [name for name, _ in model.state_dict().items()]
for module_name in device_map.keys():
if module_name == "":

View File

@ -194,6 +194,26 @@ def compile_regions_deepspeed(module: torch.nn.Module, **compile_kwargs):
module.compile(**compile_kwargs)
def model_has_dtensor(model: torch.nn.Module) -> bool:
"""
Check if the model has DTensor parameters.
Args:
model (`torch.nn.Module`):
The model to check.
Returns:
`bool`: Whether the model has DTensor parameters.
"""
if is_torch_version(">=", "2.5.0"):
from torch.distributed.tensor import DTensor
else:
# from torch 2.0.0 (oldest supported accelerate torch version), DTensor is in torch.distributed._tensor
from torch.distributed._tensor import DTensor
return any(isinstance(p, DTensor) for p in model.parameters())
def extract_model_from_parallel(
model, keep_fp32_wrapper: bool = True, keep_torch_compile: bool = True, recursive: bool = False
):

View File

@ -0,0 +1,240 @@
# Copyright 2022 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
import json
from pathlib import Path
import torch
from torch.utils.data import DataLoader
from transformers import AutoModel
from transformers.trainer_utils import set_seed
from accelerate.accelerator import Accelerator
from accelerate.test_utils.testing import AccelerateTestCase, require_deepspeed
from accelerate.test_utils.training import RegressionDataset
from accelerate.utils import patch_environment
from accelerate.utils.dataclasses import DeepSpeedPlugin
set_seed(42)
GPT2_TINY = "hf-internal-testing/tiny-random-gpt2"
ZERO2 = "zero2"
ZERO3 = "zero3"
FP16 = "fp16"
@require_deepspeed
class DeepSpeedGradientAccumulationTest(AccelerateTestCase):
def setUp(self):
super().setUp()
self._test_file_path = inspect.getfile(self.__class__)
path = Path(self._test_file_path).resolve()
self.test_file_dir_str = str(path.parents[0])
self.ds_config_file = dict(
zero2=f"{self.test_file_dir_str}/ds_config_zero2.json",
zero3=f"{self.test_file_dir_str}/ds_config_zero3.json",
)
# Load config files
with open(self.ds_config_file[ZERO2], encoding="utf-8") as f:
config_zero2 = json.load(f)
with open(self.ds_config_file[ZERO3], encoding="utf-8") as f:
config_zero3 = json.load(f)
config_zero3["zero_optimization"]["stage3_gather_16bit_weights_on_model_save"] = False
self.ds_config_dict = dict(zero2=config_zero2, zero3=config_zero3)
self.dist_env = dict(
ACCELERATE_USE_DEEPSPEED="true",
MASTER_ADDR="localhost",
MASTER_PORT="10999",
RANK="0",
LOCAL_RANK="0",
WORLD_SIZE="1",
)
def test_gradient_accumulation_boundary_integration(self):
"""Test that gradient accumulation boundaries are automatically handled by DeepSpeed integration."""
gradient_accumulation_steps = 4
deepspeed_plugin = DeepSpeedPlugin(
gradient_accumulation_steps=gradient_accumulation_steps,
gradient_clipping=1.0,
zero_stage=2,
offload_optimizer_device="cpu",
offload_param_device="cpu",
zero3_save_16bit_model=False,
zero3_init_flag=False,
)
with patch_environment(**self.dist_env):
accelerator = Accelerator(mixed_precision="fp16", deepspeed_plugin=deepspeed_plugin)
# Setup simple training components
train_set = RegressionDataset(length=80)
train_dataloader = DataLoader(train_set, batch_size=16, shuffle=True)
model = AutoModel.from_pretrained(GPT2_TINY)
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
model, optimizer, train_dataloader = accelerator.prepare(model, optimizer, train_dataloader)
model.train()
# Test gradient accumulation with accumulate context manager
batch_data = next(iter(train_dataloader))
# Create proper input format for GPT2 model (RegressionDataset returns {"x": scalar, "y": scalar})
# We need to create dummy input_ids for the GPT2 model
batch_size = batch_data["x"].shape[0] if isinstance(batch_data["x"], torch.Tensor) else 1
# Create dummy input_ids for GPT2 model and move to same device as model
device = next(model.parameters()).device
input_ids = torch.randint(0, 1000, (batch_size, 10), device=device) # batch_size x sequence_length
inputs = {"input_ids": input_ids}
# Track sync_gradients values to verify correct gradient accumulation behavior
sync_values = []
# Simulate gradient accumulation steps
for micro_step in range(gradient_accumulation_steps):
with accelerator.accumulate(model):
sync_values.append(accelerator.sync_gradients)
outputs = model(**inputs)
# Use the last hidden state and create a simple loss
prediction = outputs.last_hidden_state.mean()
loss = prediction.sum() # Simple scalar loss
# This should automatically handle gradient accumulation boundaries
accelerator.backward(loss)
if accelerator.sync_gradients:
optimizer.step()
optimizer.zero_grad()
# Verify gradient accumulation pattern was correct
# Should be False for first 3 steps, True for the last step
expected_sync = [False, False, False, True]
self.assertEqual(sync_values, expected_sync)
# Reset step counter for accelerator
accelerator.step = 0
def test_clip_grad_norm_returns_deepspeed_grad_norm(self):
"""Test that clip_grad_norm_ works with DeepSpeed and returns gradient norm when available."""
deepspeed_plugin = DeepSpeedPlugin(
gradient_accumulation_steps=1,
gradient_clipping=1.0,
zero_stage=2,
offload_optimizer_device="cpu",
offload_param_device="cpu",
zero3_save_16bit_model=False,
zero3_init_flag=False,
)
with patch_environment(**self.dist_env):
accelerator = Accelerator(mixed_precision="fp16", deepspeed_plugin=deepspeed_plugin)
# Setup simple model
model = AutoModel.from_pretrained(GPT2_TINY)
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
# Create a simple dataloader for prepare to work
train_set = RegressionDataset(length=16)
train_dataloader = DataLoader(train_set, batch_size=16, shuffle=True)
model, optimizer, train_dataloader = accelerator.prepare(model, optimizer, train_dataloader)
# Perform a forward and backward pass to generate gradients
batch_data = next(iter(train_dataloader))
batch_size = len(batch_data["x"]) if isinstance(batch_data["x"], torch.Tensor) else 1
# Create dummy input_ids for GPT2 model and move to same device as model
device = next(model.parameters()).device
input_ids = torch.randint(0, 1000, (batch_size, 10), device=device)
inputs = {"input_ids": input_ids}
# Forward pass
outputs = model(**inputs)
prediction = outputs.last_hidden_state.mean()
loss = prediction.sum()
# Backward pass to generate gradients
accelerator.backward(loss)
# Test that gradient clipping works and returns a value
grad_norm = accelerator.clip_grad_norm_(model.parameters(), max_norm=1.0)
# After backward pass, we should get a valid gradient norm (either from DeepSpeed or fallback)
self.assertIsInstance(grad_norm, (int, float, type(None)))
if grad_norm is not None:
self.assertGreaterEqual(grad_norm, 0.0)
def test_accelerator_backward_passes_sync_gradients(self):
"""Test that Accelerator.backward() passes sync_gradients to DeepSpeed wrapper."""
deepspeed_plugin = DeepSpeedPlugin(
gradient_accumulation_steps=2,
gradient_clipping=1.0,
zero_stage=2,
offload_optimizer_device="cpu",
offload_param_device="cpu",
zero3_save_16bit_model=False,
zero3_init_flag=False,
)
with patch_environment(**self.dist_env):
accelerator = Accelerator(mixed_precision="fp16", deepspeed_plugin=deepspeed_plugin)
# Setup simple model and data
model = AutoModel.from_pretrained(GPT2_TINY)
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
train_set = RegressionDataset(length=16)
train_dataloader = DataLoader(train_set, batch_size=8, shuffle=True)
model, optimizer, train_dataloader = accelerator.prepare(model, optimizer, train_dataloader)
# Track sync_gradients values during backward calls
sync_values = []
# Test two gradient accumulation steps
batch_data = next(iter(train_dataloader))
# Create proper input format for GPT2 model
batch_size = len(batch_data["x"]) if isinstance(batch_data["x"], torch.Tensor) else 1
# Create dummy input_ids for GPT2 model and move to same device as model
device = next(model.parameters()).device
input_ids = torch.randint(0, 1000, (batch_size, 10), device=device)
inputs = {"input_ids": input_ids}
# First step - should have sync_gradients=False
with accelerator.accumulate(model):
sync_values.append(accelerator.sync_gradients)
outputs = model(**inputs)
prediction = outputs.last_hidden_state.mean()
loss = prediction # Simple loss
accelerator.backward(loss)
# Second step - should have sync_gradients=True
with accelerator.accumulate(model):
sync_values.append(accelerator.sync_gradients)
outputs = model(**inputs)
prediction = outputs.last_hidden_state.mean()
loss = prediction # Simple loss
accelerator.backward(loss)
# Verify sync_gradients pattern was correct
self.assertEqual(len(sync_values), 2)
self.assertFalse(sync_values[0]) # First step: not syncing
self.assertTrue(sync_values[1]) # Second step: syncing

View File

@ -29,6 +29,7 @@ from accelerate.test_utils.testing import (
get_launch_command,
path_in_accelerate_package,
require_fp16,
require_fsdp2,
require_multi_device,
require_non_cpu,
require_non_torch_xla,
@ -37,7 +38,6 @@ from accelerate.test_utils.testing import (
)
from accelerate.utils import is_bf16_available, is_fp16_available, is_hpu_available, patch_environment, set_seed
from accelerate.utils.constants import (
FSDP2_PYTORCH_VERSION,
FSDP2_STATE_DICT_TYPE,
FSDP_AUTO_WRAP_POLICY,
FSDP_BACKWARD_PREFETCH,
@ -46,7 +46,6 @@ from accelerate.utils.constants import (
)
from accelerate.utils.dataclasses import FullyShardedDataParallelPlugin
from accelerate.utils.fsdp_utils import disable_fsdp_ram_efficient_loading, enable_fsdp_ram_efficient_loading
from accelerate.utils.versions import is_torch_version
set_seed(42)
@ -63,10 +62,6 @@ if is_fp16_available():
if is_bf16_available():
dtypes.append(BF16)
FSDP_VERSIONS = [1]
if is_torch_version(">=", FSDP2_PYTORCH_VERSION):
FSDP_VERSIONS.append(2)
@require_non_cpu
@require_non_torch_xla
@ -90,24 +85,7 @@ class FSDPPluginIntegration(AccelerateTestCase):
2: self.fsdp2_env,
}
def run(self, result=None):
"""Override run to get the current test name and format failures to include FSDP version."""
test_method = getattr(self, self._testMethodName)
orig_test_method = test_method
def test_wrapper(*args, **kwargs):
for fsdp_version in FSDP_VERSIONS:
try:
self.current_fsdp_version = fsdp_version
return orig_test_method(*args, **kwargs)
except Exception as e:
raise type(e)(f"FSDP version {fsdp_version}: {str(e)}") from e
setattr(self, self._testMethodName, test_wrapper)
try:
return super().run(result)
finally:
setattr(self, self._testMethodName, orig_test_method)
self.current_fsdp_version = 1
def test_sharding_strategy(self):
from torch.distributed.fsdp.fully_sharded_data_parallel import ShardingStrategy
@ -421,6 +399,15 @@ class FSDPPluginIntegration(AccelerateTestCase):
assert os.environ.get("FSDP_CPU_RAM_EFFICIENT_LOADING") == "False"
@require_fsdp2
@require_non_cpu
@require_non_torch_xla
class FSDP2PluginIntegration(FSDPPluginIntegration):
def setUp(self):
super().setUp()
self.current_fsdp_version = 2
@run_first
# Skip this test when TorchXLA is available because accelerate.launch does not support TorchXLA FSDP.
@require_non_torch_xla
@ -462,24 +449,7 @@ class FSDPIntegrationTest(TempDirTestCase):
self.n_train = 160
self.n_val = 160
def run(self, result=None):
"""Override run to get the current test name and format failures to include FSDP version."""
test_method = getattr(self, self._testMethodName)
orig_test_method = test_method
def test_wrapper(*args, **kwargs):
for fsdp_version in FSDP_VERSIONS:
try:
self.current_fsdp_version = fsdp_version
return orig_test_method(*args, **kwargs)
except Exception as e:
raise type(e)(f"FSDP version {fsdp_version}: {str(e)}") from e
setattr(self, self._testMethodName, test_wrapper)
try:
return super().run(result)
finally:
setattr(self, self._testMethodName, orig_test_method)
self.current_fsdp_version = 1
@require_fp16
def test_performance(self):
@ -633,3 +603,15 @@ class FSDPIntegrationTest(TempDirTestCase):
)
with patch_environment(omp_num_threads=1):
execute_subprocess_async(cmd_config)
@require_fsdp2
@run_first
# Skip this test when TorchXLA is available because accelerate.launch does not support TorchXLA FSDP.
@require_non_torch_xla
@require_multi_device
@slow
class FSDP2IntegrationTest(FSDPIntegrationTest):
def setUp(self):
super().setUp()
self.current_fsdp_version = 2

View File

@ -34,7 +34,6 @@ else:
backend = "inductor"
@require_non_hpu
@require_huggingface_suite
class RegionalCompilationTester(unittest.TestCase):
def _get_model_and_inputs(self):
@ -109,6 +108,7 @@ class RegionalCompilationTester(unittest.TestCase):
release_memory(model, full_compilation_model, regional_compilation_model)
@slow
@require_non_hpu
@require_non_cpu
@require_huggingface_suite
def test_regional_compilation_inference_speedup(self):

View File

@ -239,7 +239,10 @@ class FeatureExamplesTests(TempDirTestCase):
run_command(self.launch_args + testargs)
@require_trackers
@mock.patch.dict(os.environ, {"WANDB_MODE": "offline", "DVCLIVE_TEST": "true"})
@mock.patch.dict(
os.environ,
{"WANDB_MODE": "offline", "DVCLIVE_TEST": "true", "SWANLAB_MODE": "offline"},
)
def test_tracking(self):
with tempfile.TemporaryDirectory() as tmpdir:
testargs = f"""

View File

@ -12,9 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import json
import os
import tempfile
import textwrap
import unittest
from pathlib import Path
import torch
@ -32,16 +36,18 @@ from accelerate.test_utils import (
from accelerate.test_utils.testing import require_deepspeed, run_command
from accelerate.utils import (
AORecipeKwargs,
FP8RecipeKwargs,
TERecipeKwargs,
has_ao_layers,
has_transformer_engine_layers,
is_torchao_available,
is_transformer_engine_available,
)
def can_convert_te_model():
accelerator_kwargs = {"mixed_precision": "fp8", "kwargs_handlers": [FP8RecipeKwargs(backend="TE")]}
def can_convert_te_model(from_config=False):
if not from_config:
accelerator_kwargs = {"mixed_precision": "fp8", "kwargs_handlers": [TERecipeKwargs()]}
else:
accelerator_kwargs = {}
accelerator = Accelerator(**accelerator_kwargs)
dataloader = torch.utils.data.DataLoader(torch.randn(10, 32), batch_size=2)
model = torch.nn.Sequential(torch.nn.Linear(32, 32), torch.nn.Linear(32, 16))
@ -58,10 +64,14 @@ def maintain_proper_deepspeed_config(expected_version):
)
def can_convert_ao_model():
def can_convert_ao_model(from_config=False):
from transformers import AutoModelForSequenceClassification
accelerator_kwargs = {"mixed_precision": "fp8", "kwargs_handlers": [AORecipeKwargs()]}
if not from_config:
accelerator_kwargs = {"mixed_precision": "fp8", "kwargs_handlers": [AORecipeKwargs()]}
else:
accelerator_kwargs = {}
accelerator = Accelerator(**accelerator_kwargs)
dataloader = torch.utils.data.DataLoader(torch.randn(10, 32), batch_size=2)
model = AutoModelForSequenceClassification.from_pretrained("bert-base-cased")
@ -78,13 +88,31 @@ def can_convert_ao_model():
class TestTransformerEngine(unittest.TestCase):
def test_can_prepare_model_single_gpu(self):
command = get_launch_command(num_processes=1, monitor_interval=0.1)
command += ["-m", "tests.test_fp8"]
command += ["-m", "tests.test_fp8", "--test_te"]
run_command(command)
def test_can_prepare_model_single_gpu_from_config(self):
with tempfile.TemporaryDirectory() as dir_name:
config_file = Path(dir_name) / "config.yaml"
config_file.write_text(
textwrap.dedent(
"""
distributed_type: "NO"
num_processes: 1
mixed_precision: fp8
fp8_config:
backend: TE
"""
)
)
command = get_launch_command(config_file=str(config_file), monitor_interval=0.1)
command += ["-m", "tests.test_fp8", "--test_te", "--from_config"]
run_command(command)
@require_multi_device
def test_can_prepare_model_multi_gpu(self):
command = get_launch_command(num_processes=2, monitor_interval=0.1)
command += ["-m", "tests.test_fp8"]
command += ["-m", "tests.test_fp8", "--test_te"]
run_command(command)
@require_deepspeed
@ -116,7 +144,7 @@ class TestTransformerEngine(unittest.TestCase):
command = get_launch_command(
num_processes=2, monitor_interval=0.1, use_deepspeed=True, deepspeed_config_file=ds_config
)
command += ["-m", "tests.test_fp8"]
command += ["-m", "tests.test_fp8", "--test_te"]
run_command(command)
@ -125,13 +153,31 @@ class TestTransformerEngine(unittest.TestCase):
class TestTorchAO(unittest.TestCase):
def test_can_prepare_model_single_accelerator(self):
command = get_launch_command(num_processes=1, monitor_interval=0.1)
command += ["-m", "tests.test_fp8"]
command += ["-m", "tests.test_fp8", "--test_ao"]
run_command(command)
def test_can_prepare_model_single_gpu_from_config(self):
with tempfile.TemporaryDirectory() as dir_name:
config_file = Path(dir_name) / "config.yaml"
config_file.write_text(
textwrap.dedent(
"""
distributed_type: "NO"
num_processes: 1
mixed_precision: fp8
fp8_config:
backend: AO
"""
)
)
command = get_launch_command(config_file=str(config_file), monitor_interval=0.1)
command += ["-m", "tests.test_fp8", "--test_ao", "--from_config"]
run_command(command)
@require_multi_device
def test_can_prepare_model_multi_accelerator(self):
command = get_launch_command(num_processes=2, monitor_interval=0.1)
command += ["-m", "tests.test_fp8"]
command += ["-m", "tests.test_fp8", "--test_ao"]
run_command(command)
@require_deepspeed
@ -163,16 +209,26 @@ class TestTorchAO(unittest.TestCase):
command = get_launch_command(
num_processes=2, monitor_interval=0.1, use_deepspeed=True, deepspeed_config_file=ds_config
)
command += ["-m", "tests.test_fp8"]
command += ["-m", "tests.test_fp8", "--test_ao"]
run_command(command)
if __name__ == "__main__":
# TE suite
if is_transformer_engine_available():
can_convert_te_model()
parser = argparse.ArgumentParser()
parser.add_argument("--test_te", action="store_true", default=False)
parser.add_argument("--test_ao", action="store_true", default=False)
parser.add_argument("--from_config", action="store_true", default=False)
args = parser.parse_args()
if not args.test_te and not args.test_ao:
raise ValueError("Must specify at least one of --test_te or --test_ao")
if args.test_te:
can_convert_te_model(args.from_config)
if os.environ.get("ACCELERATE_USE_DEEPSPEED", "false") == "true":
maintain_proper_deepspeed_config(int(os.environ.get("ZERO_STAGE")))
# AO suite
if is_torchao_available():
can_convert_ao_model()
if args.test_ao:
can_convert_ao_model(args.from_config)

View File

@ -61,7 +61,31 @@ class MemoryTest(unittest.TestCase):
raise_fake_out_of_memory()
mock_training_loop_function()
assert batch_sizes == [128, 64, 32, 16, 8]
assert batch_sizes == [
128,
115,
103,
92,
82,
73,
65,
58,
52,
46,
41,
36,
32,
28,
25,
22,
19,
17,
15,
13,
11,
9,
8,
]
def test_memory_explicit(self):
batch_sizes = []
@ -75,7 +99,31 @@ class MemoryTest(unittest.TestCase):
return batch_size, arg1
bs, arg1 = mock_training_loop_function("hello")
assert batch_sizes == [128, 64, 32, 16, 8]
assert batch_sizes == [
128,
115,
103,
92,
82,
73,
65,
58,
52,
46,
41,
36,
32,
28,
25,
22,
19,
17,
15,
13,
11,
9,
8,
]
assert [bs, arg1] == [8, "hello"]
def test_start_zero(self):

View File

@ -349,6 +349,26 @@ class ModelingUtilsTester(unittest.TestCase):
check_device_map(model, {"linear1": 0, "linear2": 1, "batchnorm": 1})
def test_check_device_map_invalid_keys(self):
model = ModelForTest()
device_map = {
"linear1": "cpu", # Valid module
"batchnorm": "cpu", # Valid module
"linear2": "cpu", # Valid module
"invalid_module": 0, # Invalid - should trigger warning
"another_invalid": 1, # Invalid - should trigger warning
}
# Test for the warning about invalid keys
with self.assertWarns(UserWarning) as cm:
check_device_map(model, device_map)
warning_msg = str(cm.warning)
self.assertIn("device_map keys do not match any submodules", warning_msg)
self.assertIn("invalid_module", warning_msg)
self.assertIn("another_invalid", warning_msg)
def shard_test_model(self, model, tmp_dir):
module_index = {
"linear1": "checkpoint_part1.bin",

View File

@ -28,7 +28,6 @@ from accelerate.test_utils import (
path_in_accelerate_package,
require_huggingface_suite,
require_multi_device,
require_non_hpu,
require_non_torch_xla,
require_pippy,
require_torchvision,
@ -70,7 +69,6 @@ class MultiDeviceTester(unittest.TestCase):
execute_subprocess_async(cmd)
@run_first
@require_non_hpu # Synapse detected a device critical error that requires a restart
@require_multi_device
def test_multi_device_merge_fsdp_weights(self):
print(f"Found {device_count} {torch_device} devices.")

View File

@ -16,6 +16,7 @@ import csv
import json
import logging
import os
import random
import re
import subprocess
import tempfile
@ -42,7 +43,9 @@ from accelerate.test_utils.testing import (
require_matplotlib,
require_mlflow,
require_pandas,
require_swanlab,
require_tensorboard,
require_trackio,
require_wandb,
skip,
)
@ -53,7 +56,9 @@ from accelerate.tracking import (
DVCLiveTracker,
GeneralTracker,
MLflowTracker,
SwanLabTracker,
TensorBoardTracker,
TrackioTracker,
WandBTracker,
)
from accelerate.utils import (
@ -520,6 +525,123 @@ class ClearMLTest(TempDirTestCase, MockingTestCase):
self.assertCountEqual(plot["data"][0]["cells"]["values"], [[1, 2], [3, 4], [5, 6]])
@require_swanlab
@mock.patch.dict(os.environ, {"SWANLAB_MODE": "offline"})
class SwanLabTrackingTest(TempDirTestCase, MockingTestCase):
def setUp(self):
super().setUp()
# Setting Path where SwanLab parsed log files are saved via the SWANLAB_LOG_DIR env var
self.add_mocks(mock.patch.dict(os.environ, {"SWANLAB_LOG_DIR": self.tmpdir}))
@skip
def test_swanlab(self):
# Disable hardware monitoring to prevent errors in test mode.
import swanlab
from swanlab.log.backup import BackupHandler
from swanlab.log.backup.datastore import DataStore
from swanlab.log.backup.models import ModelsParser
swanlab.merge_settings(swanlab.Settings(hardware_monitor=False))
# Start a fake training session.
accelerator = Accelerator(log_with="swanlab")
project_name = "test_project_with_config"
experiment_name = "test"
description = "test project for swanlab"
tags = ["my_tag"]
config = {
"epochs": 10,
"learning_rate": 0.01,
"offset": 0.1,
}
kwargs = {
"swanlab": {
"experiment_name": experiment_name,
"description": description,
"tags": tags,
}
}
accelerator.init_trackers(project_name, config, kwargs)
record_metrics = []
record_scalars = []
record_images_count = 0
record_logs = []
for epoch in range(1, swanlab.config.epochs):
acc = 1 - 2**-epoch - random.random() / epoch - 0.1
loss = 2**-epoch + random.random() / epoch + 0.1
ll = swanlab.log(
{
"accuracy": acc,
"loss": loss,
"image": swanlab.Image(np.random.random((3, 3, 3))),
},
step=epoch,
)
log = f"epoch={epoch}, accuracy={acc}, loss={loss}"
print(log)
record_scalars.extend([acc, loss])
record_images_count += 1
record_logs.append(log)
record_metrics.extend([x for _, x in ll.items()])
accelerator.end_training()
# Load latest offline log
run_dir = swanlab.get_run().public.run_dir
assert os.path.exists(run_dir) is True
ds = DataStore()
ds.open_for_scan(os.path.join(run_dir.__str__(), BackupHandler.BACKUP_FILE).__str__())
with ModelsParser() as models_parser:
for record in ds:
if record is None:
continue
models_parser.parse_record(record)
header, project, experiment, logs, runtime, columns, scalars, medias, footer = models_parser.get_parsed()
# test file header
assert header.backup_type == "DEFAULT"
# test project info
assert project.name == project_name
assert project.workspace is None
assert project.public is None
# test experiment info
assert experiment.name is not None
assert experiment.description == description
assert experiment.tags == tags
# test log record
backup_logs = [log.message for log in logs]
for record_log in record_logs:
assert record_log in backup_logs, "Log not found in backup logs: " + record_log
# test runtime info
runtime_info = runtime.to_file_model(os.path.join(run_dir.__str__(), "files"))
assert runtime_info.conda is None, "Not using conda, should be None"
assert isinstance(runtime_info.requirements, str), "Requirements should be a string"
assert isinstance(runtime_info.metadata, dict), "Metadata should be a dictionary"
assert isinstance(runtime_info.config, dict), "Config should be a dictionary"
for key in runtime_info.config:
assert key in config, f"Config key {key} not found in original config"
assert runtime_info.config[key]["value"] == config[key], (
f"Config value for {key} does not match original value"
)
# test scalar
assert len(scalars) + len(medias) == len(record_metrics), "Total metrics count does not match"
backup_scalars = [
metric.metric["data"]
for metric in record_metrics
if metric.column_info.chart_type.value.column_type == "FLOAT"
]
assert len(backup_scalars) == len(scalars), "Total scalars count does not match"
for scalar in backup_scalars:
assert scalar in record_scalars, f"Scalar {scalar} not found in original scalars"
backup_images = [
metric for metric in record_metrics if metric.column_info.chart_type.value.column_type == "IMAGE"
]
assert len(backup_images) == record_images_count, "Total images count does not match"
class MyCustomTracker(GeneralTracker):
"Basic tracker that writes to a csv for testing"
@ -681,6 +803,15 @@ class TrackerDeferredInitializationTest(unittest.TestCase):
_ = Accelerator(log_with=tracker)
self.assertNotEqual(PartialState._shared_state, {})
@require_trackio
def test_trackio_deferred_init(self):
"""Test that trackio tracker initialization doesn't initialize distributed"""
PartialState._reset_state()
tracker = TrackioTracker(run_name="test_trackio")
self.assertEqual(PartialState._shared_state, {})
_ = Accelerator(log_with=tracker)
self.assertNotEqual(PartialState._shared_state, {})
@require_comet_ml
def test_comet_ml_deferred_init(self):
"""Test that CometML tracker initialization doesn't initialize distributed"""
@ -728,3 +859,12 @@ class TrackerDeferredInitializationTest(unittest.TestCase):
self.assertEqual(PartialState._shared_state, {})
_ = Accelerator(log_with=tracker)
self.assertNotEqual(PartialState._shared_state, {})
@require_swanlab
def test_swanlab_deferred_init(self):
"""Test that SwanLab tracker initialization doesn't initialize distributed"""
PartialState._reset_state()
tracker = SwanLabTracker(run_name="test_swanlab")
self.assertEqual(PartialState._shared_state, {})
_ = Accelerator(log_with=tracker)
self.assertNotEqual(PartialState._shared_state, {})