Compare commits

...

30 Commits

Author SHA1 Message Date
28a3b985f0 Release: v0.33.0 2024-07-23 09:11:56 -04:00
415eddf1be feat(ci): add pip caching in CI (#2952) 2024-07-22 16:55:08 -04:00
230857691a Properly handle Params4bit in set_module_tensor_to_device (#2934)
* Properly handle  in

* Add comment to explain Params4bit skipping shape check for set_module_tensor_to_device
2024-07-22 08:42:49 -04:00
a5a3e57125 Add torch.float8_e4m3fn format dtype_byte_size (#2945)
* add new format

* check torch version

* style
2024-07-20 03:07:07 +02:00
0af1d8b8de delete CCL env var setting (#2927)
* delete CCL env var setting

* fix format
2024-07-17 22:15:46 -04:00
d16d7371a1 Improve test reliability for Accelerator.free_memory() (#2935) 2024-07-16 08:40:51 -04:00
7a5c231b9e Consider pynvml available when installed through the nvidia-ml-py distribution (#2936) 2024-07-16 08:40:16 -04:00
4f02bb764a Fix import test (#2931)
* Fix import test

* Tweak threash
2024-07-15 11:13:23 -04:00
YH
709fd1e42b Hotfix PyTorch Version Installation in CI Workflow for Minimum Version Matrix (#2889)
* Fix ci torch version matrix

* Patch torch minor version
2024-07-15 10:31:12 -04:00
f4f1260a0e Correct loading of models with shared tensors when using accelerator.load_state() (#2875)
* Enabled correct loading of models with shared tensors when using accelerator.load_state()

* removed unused import

* added a test for a model with shared weights

* removed unnecessary bits

* fixed linting errors
2024-07-15 10:29:17 -04:00
c6da9f8693 Allow multiple process per device (#2916)
* Allow more processes than devices

* Accept suggestion

Co-authored-by: Zach Mueller <muellerzr@gmail.com>

---------

Co-authored-by: Zach Mueller <muellerzr@gmail.com>
2024-07-15 10:18:15 -04:00
3ebbe573ad Add huggingface_hub version to setup.py (#2932) 2024-07-15 10:11:41 -04:00
24bf5ec546 add xpu device check before moving tensor directly to xpu device (#2928)
* add ipex check

* fix type

* fix bug
2024-07-15 09:30:22 -04:00
e1247de01e Better error when a bad directory is given for weight merging (#2852) 2024-07-12 13:20:00 -04:00
12a007d559 Support MUSA (Moore Threads GPU) backend in accelerate (#2917) 2024-07-10 13:42:28 +02:00
5bdcd7e169 fix: bug where mulit_gpu was being set and warning being printed even with num_processes=1 (#2921)
Signed-off-by: Harikrishnan Balagopal <harikrishmenon@gmail.com>
2024-07-08 12:06:30 -04:00
2471eacdd6 Fix slowdown on init with device_map="auto" (#2914) 2024-07-04 09:10:21 -04:00
167cb5eb20 [tests] fix bug in torch_device (#2909) 2024-07-04 06:44:40 -04:00
947f64ee62 Version update 2024-07-03 13:27:34 -04:00
8330b375d4 Fix get_backend bug and add clear_device_cache function (#2857)
* added clear_device_cache

* set lambda: 0 for mps and cpu
2024-07-03 06:59:10 -04:00
92404fbf5f fix load_state_dict for xpu and refine xpu safetensor version check (#2879)
* add fix

* update warning

* no and
2024-07-03 06:36:36 -04:00
3a02754915 add require_triton and enable test_dynamo work on xpu (#2878) 2024-07-03 04:52:09 -04:00
fec1170e35 fix mlu device longTensor bugs (#2887)
* Add Cambricon MLU accelerator support

* up mlu support for test

* fix mlu device MULTI_MLU

* Update src/accelerate/utils/imports.py

it's beautiful !

Co-authored-by: Zach Mueller <muellerzr@gmail.com>

* up mlu for quality check

* fix mlu device longTensor error

* fix mlu device tensor dtype check

* fix mlu device send_to_device with torch dynamo error

* Refactor AcceleratorState

* Should be near complete now

* Last missing piece

* Make my way to the acceleratorstate

* Include update to global var

* Don't use global

* gpu -> cuda

* Don't use update for dict, easier to read

* Fix tests

* stash

* Getting closer...

* Needed to spawn at the very end after env was setup

* Explain set_device before deepspeed

* Make docstring more accurate

* Early return insteaD

* Delineat blocks

* Make prepare_backend return state + backend for clarity/less magic

* fix mlu longtensor.to() bugs.

---------

Co-authored-by: Zach Mueller <muellerzr@gmail.com>
2024-07-03 04:50:11 -04:00
eac206f063 make more cuda-only tests device-agnostic (#2876)
* enable 3 cases

* add ests

* add 2 more

* revert 1 back

* revert 1 more

* enable on xpu
2024-07-03 04:49:53 -04:00
6882ff2bea Added a MultiCPU SLURM example using Accelerate Launch and MPIRun (#2902)
* initial commit for slurm multicpu script

* changed output path

* Added multicpu example using accelerate + mpirun + slurm

* removed file

* rename file

* deleted file

* refactored for cleanliness

* updated docs

* fixed variable names

* quality update

* test fix

* addressed review comments

* fix typo for activateEnvironment.sh

* added ACCELERATE path

* Edit wording

Co-authored-by: Dina Suehiro Jones <dina.s.jones@intel.com>

* added back mistakenly deleted line

---------

Co-authored-by: Dina Suehiro Jones <dina.s.jones@intel.com>
2024-07-03 04:14:02 -04:00
57a4c7465e Add XLA Dynamo backends for training and inference (#2892) 2024-07-03 04:10:13 -04:00
YH
404510a5ec Make log_line_prefix_template Optional in Elastic Launcher for Backward Compatibility (#2888)
* Fix unexpected keyword argument err for elastic launch config

* Update torch version flow

* Del log prefix template from env vars
2024-07-03 04:06:08 -04:00
3086e26db9 Speed up imports and add a CI (#2845)
* Working test

* Timing cleanup

* Add CI

* Fix nits

* Mixup imports

* Clean

* tuna -> tuna-interpreter

* Refactor pippy imports

* Accelerator

* Fin

* Fin

* Keep specific ones for docs
2024-07-01 18:50:18 -04:00
YH
5d5d07abfc Add Profiler Support for Performance Analysis (#2883)
* Add torch profiler

* Add example

* Fix rank 0 saving

* Add docstring

* Add profile readme

* Fix minor

* Fix example path

* Add exp test code

* Rename profile dir

* Change readme

* Change save format

* Minor

* Enhance docstring example

* Add user guide

* Add memory profile guide

* Enhance error msg

* Fix type hinting

* Minor refactor

* Fix hf tag

* Fix copyright year

* Mv toctree

* Fix image path

* Fix license year

* Change profiler pattern name

* Update package reference

* Add slow decorator

* Check output value
2024-07-01 18:01:09 -04:00
5a0b7dc597 Support saving and loading of step while saving and loading state (#2765)
* Add feature to save step when saving state

* Update docstring for `load_accelerate_state`
2024-07-01 14:57:19 -04:00
54 changed files with 1573 additions and 219 deletions

View File

@ -31,6 +31,8 @@ jobs:
uses: actions/setup-python@v3
with:
python-version: 3.8
cache: 'pip'
cache-dependency-path: 'setup.py'
- name: Install Accelerate from source
run: |

View File

@ -11,6 +11,8 @@ jobs:
uses: actions/setup-python@v3
with:
python-version: 3.8
cache: 'pip'
cache-dependency-path: 'setup.py'
- name: Install Python dependencies
run: pip install -e .[quality]
- name: Run Quality check

View File

@ -19,10 +19,12 @@ jobs:
uses: actions/setup-python@v3
with:
python-version: 3.8
cache: 'pip'
cache-dependency-path: 'setup.py'
- name: Install requirements
run: |
pip install PyGithub
- name: Close stale issues
run: |
python utils/stale.py
python utils/stale.py

View File

@ -43,13 +43,15 @@ jobs:
uses: actions/setup-python@v3
with:
python-version: 3.8
cache: 'pip'
cache-dependency-path: 'setup.py'
- name: Install the library
run: |
if [[ ${{ matrix.test-kind }} = test_prod ]]; then pip install -e .[test_prod]; fi
if [[ ${{ matrix.test-kind }} != test_prod ]]; then pip install -e .[testing,test_trackers]; fi
if [[ ${{ matrix.test-kind }} = test_rest ]]; then pip uninstall comet_ml -y; fi
if [[ ${{ matrix.test-kind }} = minimum ]]; then pip install torch==1.10.0; fi
if [[ ${{ matrix.pytorch-version }} = minimum ]]; then pip install torch==2.3.1; fi
pip install pytest-reportlog tabulate setuptools
- name: Show installed libraries
@ -65,4 +67,4 @@ jobs:
- name: Generate Report
if: always()
run: |
python utils/log_reports.py >> $GITHUB_STEP_SUMMARY
python utils/log_reports.py >> $GITHUB_STEP_SUMMARY

55
.github/workflows/test_imports.yml vendored Normal file
View File

@ -0,0 +1,55 @@
name: Run Import Tests
on:
pull_request:
paths:
- "src/**"
- "tests/**"
- ".github/**"
- "examples/**"
- "setup.py"
types: [opened, synchronize, reopened]
env:
HF_HOME: ~/hf_cache
TESTING_MOCKED_DATALOADERS: "1"
IS_GITHUB_CI: "1"
jobs:
run-tests:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
pytorch-version: [
latest,
minimum,
]
steps:
- uses: actions/checkout@v3.1.0
- name: Set up python 3.8
uses: actions/setup-python@v3
with:
python-version: 3.8
cache: 'pip'
cache-dependency-path: 'setup.py'
- name: Install the library
run: |
pip install -e .
pip install pytest-reportlog tabulate setuptools git+https://github.com/muellerzr/import-timer
- name: Show installed libraries
run: |
pip freeze
- name: Run Import Tests
env:
PYTORCH_VERSION: ${{ matrix.pytorch-version }}
run: |
pytest -sv tests/test_imports.py
- name: Generate Report
if: always()
run: |
python utils/log_reports.py >> $GITHUB_STEP_SUMMARY

View File

@ -31,6 +31,8 @@
title: Model quantization
- local: usage_guides/tracking
title: Experiment trackers
- local: usage_guides/profiler
title: Profiler
- local: usage_guides/checkpoint
title: Save and load training states
- local: basic_tutorials/troubleshooting

Binary file not shown.

After

Width:  |  Height:  |  Size: 105 KiB

View File

@ -30,6 +30,10 @@ related to distributed training or mixed precision are created.
[[autodoc]] utils.FP8RecipeKwargs
## ProfileKwargs
[[autodoc]] utils.ProfileKwargs
## GradScalerKwargs
[[autodoc]] GradScalerKwargs

View File

@ -0,0 +1,334 @@
<!--
Copyright 2024 The HuggingFace Team. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
⚠️ Note that this file is in Markdown but contains specific syntax for our doc-builder (similar to MDX) that may not be
rendered properly in your Markdown viewer.
-->
# Profiler
Profiler is a tool that allows the collection of performance metrics during training and inference. Profilers context manager API can be used to better understand what model operators are the most expensive, examine their input shapes and stack traces, study device kernel activity, and visualize the execution trace. It provides insights into the performance of your model, allowing you to optimize and improve it.
This guide explains how to use PyTorch Profiler to measure the time and memory consumption of the models operators and how to integrate this with 🤗 Accelerate. We will cover various use cases and provide examples for each.
## Using profiler to analyze execution time
Profiler allows one to check which operators were called during the execution of a code range wrapped with a profiler context manager.
Lets see how we can use profiler to analyze the execution time:
<hfoptions id="cpu execution time">
<hfoption id="PyTorch">
```python
import torch
import torchvision.models as models
from torch.profiler import profile, record_function, ProfilerActivity
model = models.resnet18()
inputs = torch.randn(5, 3, 224, 224)
with profile(activities=[ProfilerActivity.CPU], record_shapes=True) as prof:
model(inputs)
print(prof.key_averages().table(sort_by="cpu_time_total", row_limit=10))
```
</hfoption>
<hfoption id="Accelerate">
```python
from accelerate import Accelerator, ProfileKwargs
import torch
import torchvision.models as models
model = models.resnet18()
inputs = torch.randn(5, 3, 224, 224)
profile_kwargs = ProfileKwargs(
activities=["cpu"],
record_shapes=True
)
accelerator = Accelerator(cpu=True, kwargs_handlers=[profile_kwargs])
model = accelerator.prepare(model)
with accelerator.profile() as prof:
with torch.no_grad():
model(inputs)
print(prof.key_averages().table(sort_by="cpu_time_total", row_limit=10))
```
</hfoption>
</hfoptions>
The resulting table output (omitting some columns):
```
--------------------------------- ------------ ------------ ------------ ------------
Name Self CPU CPU total CPU time avg # of Calls
--------------------------------- ------------ ------------ ------------ ------------
aten::conv2d 171.000us 52.260ms 2.613ms 20
aten::convolution 227.000us 52.089ms 2.604ms 20
aten::_convolution 270.000us 51.862ms 2.593ms 20
aten::mkldnn_convolution 51.273ms 51.592ms 2.580ms 20
aten::batch_norm 118.000us 7.059ms 352.950us 20
aten::_batch_norm_impl_index 315.000us 6.941ms 347.050us 20
aten::native_batch_norm 6.305ms 6.599ms 329.950us 20
aten::max_pool2d 40.000us 4.008ms 4.008ms 1
aten::max_pool2d_with_indices 3.968ms 3.968ms 3.968ms 1
aten::add_ 780.000us 780.000us 27.857us 28
--------------------------------- ------------ ------------ ------------ ------------
Self CPU time total: 67.016ms
```
To get a finer granularity of results and include operator input shapes, pass `group_by_input_shape=True` (note: this requires running the profiler with `record_shapes=True`):
```python
print(prof.key_averages(group_by_input_shape=True).table(sort_by="cpu_time_total", row_limit=10))
```
## Using profiler to analyze memory consumption
Profiler can also show the amount of memory (used by the models tensors) that was allocated (or released) during the execution of the models operators. To enable memory profiling functionality pass `profile_memory=True`.
<hfoptions id="memory consumption">
<hfoption id="PyTorch">
```python
model = models.resnet18()
inputs = torch.randn(5, 3, 224, 224)
with profile(activities=[ProfilerActivity.CPU],
profile_memory=True, record_shapes=True) as prof:
model(inputs)
print(prof.key_averages().table(sort_by="self_cpu_memory_usage", row_limit=10))
```
</hfoption>
<hfoption id="Accelerate">
```python
model = models.resnet18()
inputs = torch.randn(5, 3, 224, 224)
profile_kwargs = ProfileKwargs(
activities=["cpu"],
profile_memory=True,
record_shapes=True
)
accelerator = Accelerator(cpu=True, kwargs_handlers=[profile_kwargs])
model = accelerator.prepare(model)
with accelerator.profile() as prof:
model(inputs)
print(prof.key_averages().table(sort_by="self_cpu_memory_usage", row_limit=10))
```
</hfoption>
</hfoptions>
The resulting table output (omitting some columns):
```
--------------------------------- ------------ ------------ ------------
Name CPU Mem Self CPU Mem # of Calls
--------------------------------- ------------ ------------ ------------
aten::empty 94.85 Mb 94.85 Mb 205
aten::max_pool2d_with_indices 11.48 Mb 11.48 Mb 1
aten::addmm 19.53 Kb 19.53 Kb 1
aten::mean 10.00 Kb 10.00 Kb 1
aten::empty_strided 492 b 492 b 5
aten::cat 240 b 240 b 6
aten::abs 480 b 240 b 4
aten::masked_select 120 b 112 b 1
aten::ne 61 b 53 b 3
aten::eq 30 b 30 b 1
--------------------------------- ------------ ------------ ------------
Self CPU time total: 69.332ms
```
## Exporting chrome trace
You can examine the sequence of profiled operators and CUDA kernels in Chrome trace viewer (`chrome://tracing`):
![profile_export](https://github.com/huggingface/accelerate/assets/100389977/5acb193f-6d11-4f7b-9873-c600c19e8172)
<hfoptions id="exporting chrome trace">
<hfoption id="PyTorch">
```python
model = models.resnet18().cuda()
inputs = torch.randn(5, 3, 224, 224).cuda()
with profile(activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA]) as prof:
model(inputs)
prof.export_chrome_trace("trace.json")
```
</hfoption>
<hfoption id="Accelerate">
```python
profile_kwargs = ProfileKwargs(
activities=["cpu", "cuda"],
output_trace_dir="trace"
)
accelerator = Accelerator(kwargs_handlers=[profile_kwargs])
model = accelerator.prepare(model)
with accelerator.profile() as prof:
model(inputs)
# The trace will be saved to the specified directory
```
</hfoption>
</hfoptions>
## Using Profiler to Analyze Long-Running Jobs
Profiler offers an additional API to handle long-running jobs (such as training loops). Tracing all of the execution can be slow and result in very large trace files. To avoid this, use optional arguments:
- `schedule_option`: Scheduling options allow you to control when profiling is active. This is useful for long-running jobs to avoid collecting too much data. Available keys are `wait`, `warmup`, `active`, `repeat` and `skip_first`. The profiler will skip the first `skip_first` steps, then wait for `wait` steps, then do the warmup for the next `warmup` steps, then do the active recording for the next `active` steps and then repeat the cycle starting with `wait` steps. The optional number of cycles is specified with the `repeat` parameter, the zero value means that the cycles will continue until the profiling is finished.
- `on_trace_ready`: specifies a function that takes a reference to the profiler as an input and is called by the profiler each time the new trace is ready.
To illustrate how the API works, consider the following example:
<hfoptions id="custom handler">
<hfoption id="PyTorch">
```python
from torch.profiler import schedule
my_schedule = schedule(
skip_first=10,
wait=5,
warmup=1,
active=3,
repeat=2
)
def trace_handler(p):
output = p.key_averages().table(sort_by="self_cuda_time_total", row_limit=10)
print(output)
p.export_chrome_trace("/tmp/trace_" + str(p.step_num) + ".json")
with profile(
activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
schedule=my_schedule,
on_trace_ready=trace_handler
) as p:
for idx in range(8):
model(inputs)
p.step()
```
</hfoption>
<hfoption id="Accelerate">
```python
def trace_handler(p):
output = p.key_averages().table(sort_by="self_cuda_time_total", row_limit=10)
print(output)
p.export_chrome_trace("/tmp/trace_" + str(p.step_num) + ".json")
profile_kwargs = ProfileKwargs(
activities=["cpu", "cuda"],
schedule_option={"wait": 5, "warmup": 1, "active": 3, "repeat": 2, "skip_first": 10},
on_trace_ready=trace_handler
)
accelerator = Accelerator(kwargs_handlers=[profile_kwargs])
model = accelerator.prepare(model)
with accelerator.profile() as prof:
for idx in range(8):
model(inputs)
prof.step()
```
</hfoption>
</hfoptions>
## FLOPS
Use formula to estimate the FLOPs (floating point operations) of specific operators (matrix multiplication and 2D convolution).
To measure floating-point operations (FLOPS):
<hfoptions id="FLOPS">
<hfoption id="PyTorch">
```python
with profile(
activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
with_flops=True
) as prof:
model(inputs)
print(prof.key_averages().table(sort_by="flops", row_limit=10))
```
</hfoption>
<hfoption id="Accelerate">
```python
profile_kwargs = ProfileKwargs(
with_flops=True
)
accelerator = Accelerator(kwargs_handlers=[profile_kwargs])
with accelerator.profile() as prof:
model(inputs)
print(prof.key_averages().table(sort_by="flops", row_limit=10))
```
</hfoption>
</hfoptions>
The resulting table output (omitting some columns):
```
------------------------------------------------------- ------------ ------------ ------------
Name Self CPU Self CUDA Total FLOPs
------------------------------------------------------- ------------ ------------ ------------
aten::conv2d 197.000us 0.000us 18135613440.000
aten::addmm 103.000us 17.000us 5120000.000
aten::mul 29.000us 2.000us 30.000
aten::convolution 409.000us 0.000us --
aten::_convolution 253.000us 0.000us --
aten::cudnn_convolution 5.465ms 2.970ms --
cudaEventRecord 138.000us 0.000us --
cudaStreamIsCapturing 43.000us 0.000us --
cudaStreamGetPriority 40.000us 0.000us --
cudaDeviceGetStreamPriorityRange 10.000us 0.000us --
------------------------------------------------------- ------------ ------------ ------------
Self CPU time total: 21.938ms
Self CUDA time total: 4.165ms
```
## Conclusion and Further Information
PyTorch Profiler is a powerful tool for analyzing the performance of your models. By integrating it with 🤗 Accelerate, you can easily profile your models and gain insights into their performance, helping you to optimize and improve them.
For more detailed information, refer to the [PyTorch Profiler documentation](https://pytorch.org/docs/stable/profiler.html).

View File

@ -233,6 +233,8 @@ In [/slurm/submit_multigpu.sh](./slurm/submit_multigpu.sh) the only parameter in
In [/slurm/submit_multinode.sh](./slurm/submit_multinode.sh) we must specify the number of nodes that will be part of the training (`--num_machines`), how many GPUs we will use in total (`--num_processes`), the [`backend`](https://pytorch.org/docs/stable/elastic/run.html#note-on-rendezvous-backend), `--main_process_ip` which will be the address the master node and the `--main_process_port`.
In [/slurm/submit_multicpu.sh](./slurm/submit_multicpu.sh) we must specify the number of nodes that will be part of the training (`--num_machines`), how many CPU processes we will use in total (`--num_processes`), the [`backend`](https://pytorch.org/docs/stable/elastic/run.html#note-on-rendezvous-backend), `--main_process_ip` which will be the address the master node and the `--main_process_port`. `mpirun_hostfile` specifies to run the job using MPIRun.
In both scripts, we run `activateEnviroment.sh` at the beginning. This script should contain the necessary instructions to initialize the environment for execution. Below, we show an example that loads the necessary libraries ([Environment modules](https://github.com/cea-hpc/modules)), activates the Python environment, and sets up various environment variables, most of them to run the scripts in offline mode in case we don't have internet connection from the cluster.
```bash

View File

@ -99,3 +99,23 @@ These arguments should be added at the end of any method for starting the python
```bash
accelerate launch ./ddp_comm_hook.py --mixed_precision fp16 --ddp_comm_hook power_sgd
```
### Profiler (`profiler.py`)
- Shows how to use the profiling capabilities of `Accelerate` to profile PyTorch models during training.
- Uses the `ProfileKwargs` handler to customize profiling options, including activities, scheduling, and additional profiling options.
- Can generate and save profiling traces in JSON format for visualization in Chrome's tracing tool.
Arguments available:
- `--record_shapes`: If passed, records shapes for profiling.
- `--profile_memory`: If passed, profiles memory usage.
- `--with_stack`: If passed, profiles stack traces.
- `--with_flops`: If passed, profiles floating point operations (FLOPS).
- `--output_trace_dir`: If specified, saves the profiling trace to the given dir in JSON format.
- `--cpu`: If passed, trains on the CPU instead of GPU.
These arguments should be added at the end of any method for starting the Python script (such as `python`, `accelerate launch`, `python -m torchrun`), such as:
```bash
accelerate launch ./profiler.py --record_shapes --profile_memory --with_flops --output_trace_dir "profiler"
```

View File

@ -0,0 +1,254 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import os
import evaluate
import torch
from datasets import load_dataset
from torch.optim import AdamW
from torch.utils.data import DataLoader
from transformers import AutoModelForSequenceClassification, AutoTokenizer, get_linear_schedule_with_warmup, set_seed
from accelerate import Accelerator, DistributedType
from accelerate.utils import ProfileKwargs
########################################################################
# This is a fully working simple example to use Accelerate
# and perform profiling
#
# This example trains a Bert base model on GLUE MRPC
# in any of the following settings (with the same script):
# - single CPU or single GPU
# - multi GPUS (using PyTorch distributed mode)
# - (multi) TPUs
# - fp16 (mixed-precision) or fp32 (normal precision)
#
# To run it in each of these various modes, follow the instructions
# in the readme for examples:
# https://github.com/huggingface/accelerate/tree/main/examples
#
########################################################################
MAX_GPU_BATCH_SIZE = 16
EVAL_BATCH_SIZE = 32
def get_dataloaders(accelerator: Accelerator, batch_size: int = 16):
"""
Creates a set of `DataLoader`s for the `glue` dataset,
using "bert-base-cased" as the tokenizer.
Args:
accelerator (`Accelerator`):
An `Accelerator` object
batch_size (`int`, *optional*):
The batch size for the train and validation DataLoaders.
"""
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
datasets = load_dataset("glue", "mrpc")
def tokenize_function(examples):
# max_length=None => use the model max length (it's actually the default)
outputs = tokenizer(examples["sentence1"], examples["sentence2"], truncation=True, max_length=None)
return outputs
# Apply the method we just defined to all the examples in all the splits of the dataset
# starting with the main process first:
with accelerator.main_process_first():
tokenized_datasets = datasets.map(
tokenize_function,
batched=True,
remove_columns=["idx", "sentence1", "sentence2"],
)
# We also rename the 'label' column to 'labels' which is the expected name for labels by the models of the
# transformers library
tokenized_datasets = tokenized_datasets.rename_column("label", "labels")
def collate_fn(examples):
# On TPU it's best to pad everything to the same length or training will be very slow.
max_length = 128 if accelerator.distributed_type == DistributedType.XLA else None
# When using mixed precision we want round multiples of 8/16
if accelerator.mixed_precision == "fp8":
pad_to_multiple_of = 16
elif accelerator.mixed_precision != "no":
pad_to_multiple_of = 8
else:
pad_to_multiple_of = None
return tokenizer.pad(
examples,
padding="longest",
max_length=max_length,
pad_to_multiple_of=pad_to_multiple_of,
return_tensors="pt",
)
# Instantiate dataloaders.
train_dataloader = DataLoader(
tokenized_datasets["train"], shuffle=True, collate_fn=collate_fn, batch_size=batch_size
)
eval_dataloader = DataLoader(
tokenized_datasets["validation"], shuffle=False, collate_fn=collate_fn, batch_size=EVAL_BATCH_SIZE
)
return train_dataloader, eval_dataloader
# For testing only
if os.environ.get("TESTING_MOCKED_DATALOADERS", None) == "1":
from accelerate.test_utils.training import mocked_dataloaders
get_dataloaders = mocked_dataloaders # noqa: F811
def training_function(config, args):
# For testing only
if os.environ.get("TESTING_MOCKED_DATALOADERS", None) == "1":
config["num_epochs"] = 2
# New Code #
profile_kwargs = ProfileKwargs(
record_shapes=args.record_shapes,
profile_memory=args.profile_memory,
with_flops=args.with_flops,
output_trace_dir=args.output_trace_dir,
)
# Initialize accelerator
accelerator = Accelerator(cpu=args.cpu, mixed_precision=args.mixed_precision, kwargs_handlers=[profile_kwargs])
# Sample hyper-parameters for learning rate, batch size, seed and a few other HPs
lr = config["lr"]
num_epochs = int(config["num_epochs"])
seed = int(config["seed"])
batch_size = int(config["batch_size"])
metric = evaluate.load("glue", "mrpc")
set_seed(seed)
train_dataloader, eval_dataloader = get_dataloaders(accelerator, batch_size)
# Instantiate the model (we build the model here so that the seed also control new weights initialization)
model = AutoModelForSequenceClassification.from_pretrained("bert-base-cased", return_dict=True)
# We could avoid this line since the accelerator is set with `device_placement=True` (default value).
# Note that if you are placing tensors on devices manually, this line absolutely needs to be before the optimizer
# creation otherwise training will not work on TPU (`accelerate` will kindly throw an error to make us aware of that).
model = model.to(accelerator.device)
# Instantiate optimizer
optimizer = AdamW(params=model.parameters(), lr=lr)
# Instantiate scheduler
lr_scheduler = get_linear_schedule_with_warmup(
optimizer=optimizer,
num_warmup_steps=100,
num_training_steps=(len(train_dataloader) * num_epochs),
)
# Prepare everything
# There is no specific order to remember, we just need to unpack the objects in the same order we gave them to the
# prepare method.
model, optimizer, train_dataloader, eval_dataloader, lr_scheduler = accelerator.prepare(
model, optimizer, train_dataloader, eval_dataloader, lr_scheduler
)
# Now we train the model
for epoch in range(num_epochs):
model.train()
# New Code #
with accelerator.profile() as prof:
for step, batch in enumerate(train_dataloader):
# We could avoid this line since we set the accelerator with `device_placement=True`.
batch.to(accelerator.device)
# We use the new `accumulate` context manager to perform gradient accumulation
with accelerator.accumulate(model):
output = model(**batch)
loss = output.loss
accelerator.backward(loss)
optimizer.step()
lr_scheduler.step()
optimizer.zero_grad()
# New Code #
accelerator.print(
prof.key_averages().table(
sort_by="self_cpu_time_total" if args.cpu else "self_cuda_time_total", row_limit=-1
)
)
model.eval()
for step, batch in enumerate(eval_dataloader):
# We could avoid this line since we set the accelerator with `device_placement=True`.
batch.to(accelerator.device)
with torch.no_grad():
outputs = model(**batch)
predictions = outputs.logits.argmax(dim=-1)
predictions, references = accelerator.gather_for_metrics((predictions, batch["labels"]))
metric.add_batch(
predictions=predictions,
references=references,
)
eval_metric = metric.compute()
# Use accelerator.print to print only on the main process.
accelerator.print(f"epoch {epoch}:", eval_metric)
def main():
parser = argparse.ArgumentParser(description="Simple example of training script.")
parser.add_argument(
"--mixed_precision",
type=str,
default=None,
choices=["no", "fp16", "bf16", "fp8"],
help="Whether to use mixed precision. Choose"
"between fp16 and bf16 (bfloat16). Bf16 requires PyTorch >= 1.10."
"and an Nvidia Ampere GPU.",
)
# New Code #
parser.add_argument(
"--record_shapes",
action="store_true",
default=False,
help="If passed, will record shapes for profiling.",
)
# New Code #
parser.add_argument(
"--profile_memory",
action="store_true",
default=False,
help="If passed, will profile memory.",
)
# New Code #
parser.add_argument(
"--with_flops",
action="store_true",
default=False,
help="If passed, will profile flops.",
)
# New Code #
parser.add_argument(
"--output_trace_dir",
type=str,
default=None,
help="If passed, will save a json trace to the specified path.",
)
parser.add_argument("--cpu", action="store_true", help="If passed, will train on the CPU.")
args = parser.parse_args()
config = {"lr": 2e-5, "num_epochs": 3, "seed": 42, "batch_size": 16}
training_function(config, args)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,65 @@
#!/bin/bash -l
#SBATCH --job-name=multicpu
#SBATCH --nodes=2 # number of Nodes
#SBATCH --ntasks-per-node=1 # number of MP tasks
#SBATCH --exclusive
#SBATCH --output=O-%x.%j
#SBATCH --error=E-%x.%j
######################
### Set enviroment ###
######################
source activateEnvironment.sh
######################
#### Set network #####
######################
head_node_ip=$(scontrol show hostnames $SLURM_JOB_NODELIST | head -n 1)
######################
# Setup env variables for distributed jobs
export MASTER_PORT="${MASTER_PORT:-29555 }"
echo "head_node_ip=${head_node_ip}"
echo "MASTER_PORT=${MASTER_PORT}"
INSTANCES_PER_NODE="${INSTANCES_PER_NODE:-1}"
if [[ $SLURM_NNODES == 1 ]] && [[ $INSTANCES_PER_NODE == 1 ]]; then
export CCL_WORKER_COUNT=0
LAUNCHER=""
else
# Setup env variables for distributed jobs
export CCL_WORKER_COUNT="${CCL_WORKER_COUNT:-2}"
echo "CCL_WORKER_COUNT=${CCL_WORKER_COUNT}"
# Write hostfile
HOSTFILE_PATH=hostfile
scontrol show hostname $SLURM_JOB_NODELIST | perl -ne 'chomb; print "$_"x1'> ${HOSTFILE_PATH}
export LAUNCHER="accelerate launch \
--num_processes $((SLURM_NNODES * ${INSTANCES_PER_NODE})) \
--num_machines $SLURM_NNODES \
--rdzv_backend c10d \
--main_process_ip $head_node_ip \
--main_process_port $MASTER_PORT \
--mpirun_hostfile $HOSTFILE_PATH \
--mpirun_ccl $CCL_WORKER_COUNT"
fi
# This step is necessary because accelerate launch does not handle multiline arguments properly
export ACCELERATE_DIR="${ACCELERATE_DIR:-/accelerate}"
export SCRIPT="${ACCELERATE_DIR}/examples/complete_nlp_example.py"
export SCRIPT_ARGS=" \
--cpu \
--output_dir ${ACCELERATE_DIR}/examples/output \
"
# This step is necessary because accelerate launch does not handle multiline arguments properly
export CMD="$LAUNCHER $SCRIPT $SCRIPT_ARGS"
# Print the command
echo $CMD
echo ""
# Run the command
eval $CMD

View File

@ -13,14 +13,15 @@
######################
### Set enviroment ###
######################
source activateEnviroment.sh
source activateEnvironment.sh
export GPUS_PER_NODE=4
######################
export SCRIPT=/accelerate/examples/complete_nlp_example.py
export ACCELERATE_DIR="${ACCELERATE_DIR:-/accelerate}"
export SCRIPT="${ACCELERATE_DIR}/examples/complete_nlp_example.py"
export SCRIPT_ARGS=" \
--mixed_precision fp16 \
--output_dir /accelerate/examples/output \
--output_dir ${ACCELERATE_DIR}/examples/output \
--with_tracking \
"

View File

@ -13,7 +13,7 @@
######################
### Set enviroment ###
######################
source activateEnviroment.sh
source activateEnvironment.sh
export GPUS_PER_NODE=4
######################
@ -30,10 +30,11 @@ export LAUNCHER="accelerate launch \
--main_process_ip $head_node_ip \
--main_process_port 29500 \
"
export SCRIPT="/accelerate/examples/complete_nlp_example.py"
export ACCELERATE_DIR="${ACCELERATE_DIR:-/accelerate}"
export SCRIPT="${ACCELERATE_DIR}/examples/complete_nlp_example.py"
export SCRIPT_ARGS=" \
--mixed_precision fp16 \
--output_dir /accelerate/examples/output \
--output_dir ${ACCELERATE_DIR}/examples/output \
"
# This step is necessary because accelerate launch does not handle multiline arguments properly

View File

@ -48,7 +48,7 @@ extras["sagemaker"] = [
setup(
name="accelerate",
version="0.32.0.dev0",
version="0.33.0",
description="Accelerate",
long_description=open("README.md", encoding="utf-8").read(),
long_description_content_type="text/markdown",
@ -75,7 +75,7 @@ setup(
"psutil",
"pyyaml",
"torch>=1.10.0",
"huggingface_hub",
"huggingface_hub>=0.21.0",
"safetensors>=0.3.1",
],
extras_require=extras,

View File

@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__version__ = "0.32.0.dev0"
__version__ = "0.33.0"
from .accelerator import Accelerator
from .big_modeling import (
@ -37,6 +37,7 @@ from .utils import (
FullyShardedDataParallelPlugin,
GradScalerKwargs,
InitProcessGroupKwargs,
ProfileKwargs,
find_executable_batch_size,
infer_auto_device_map,
is_rich_available,

View File

@ -64,6 +64,7 @@ from .utils import (
LoggerType,
MegatronLMPlugin,
PrecisionType,
ProfileKwargs,
ProjectConfiguration,
RNGType,
TorchDynamoPlugin,
@ -80,12 +81,12 @@ from .utils import (
has_transformer_engine_layers,
is_bf16_available,
is_deepspeed_available,
is_fp8_available,
is_ipex_available,
is_lomo_available,
is_megatron_lm_available,
is_mlu_available,
is_msamp_available,
is_musa_available,
is_npu_available,
is_torch_version,
is_torch_xla_available,
@ -102,7 +103,7 @@ from .utils import (
save_fsdp_optimizer,
wait_for_everyone,
)
from .utils.constants import FSDP_PYTORCH_VERSION
from .utils.constants import FSDP_PYTORCH_VERSION, PROFILE_PATTERN_NAME
from .utils.modeling import get_state_dict_offloaded_model
from .utils.other import is_compiled_module
@ -116,11 +117,6 @@ if is_deepspeed_available():
DummyScheduler,
)
if is_fp8_available():
import transformer_engine.common.recipe as te_recipe
from transformer_engine.pytorch import fp8_autocast
if is_megatron_lm_available():
from .utils import (
MegatronEngine,
@ -220,8 +216,8 @@ class Accelerator:
Set `True` if the learning rate scheduler is stepped at the same time as the optimizer, `False` if only
done under certain circumstances (at the end of each epoch, for instance).
kwargs_handlers (list of [`~utils.KwargsHandler`], *optional*)
A list of [`~utils.KwargsHandler`] to customize how the objects related to distributed training or mixed
precision are created. See [kwargs](kwargs) for more information.
A list of [`~utils.KwargsHandler`] to customize how the objects related to distributed training, profiling
or mixed precision are created. See [kwargs](kwargs) for more information.
dynamo_backend (`str` or [`~utils.DynamoBackend`], *optional*, defaults to `"no"`):
Set to one of the possible dynamo backends to optimize your training with torch dynamo.
gradient_accumulation_plugin ([`~utils.GradientAccumulationPlugin`], *optional*):
@ -298,6 +294,9 @@ class Accelerator:
if is_mlu_available():
if compare_versions("deepspeed-mlu", "<", "0.10.1"):
raise ImportError("DeepSpeed MLU version must be >= 0.10.1. Please update DeepSpeed MLU.")
elif is_musa_available():
if compare_versions("deepspeed", ">", "0.14.3"):
raise ImportError("DeepSpeed MUSA version must be <= 0.14.3. Please downgrade DeepSpeed.")
elif compare_versions("deepspeed", "<", "0.9.3"):
raise ImportError("DeepSpeed version must be >= 0.9.3. Please update DeepSpeed.")
@ -341,6 +340,7 @@ class Accelerator:
self.init_handler = None
self.fp8_recipe_handler = None
self.autocast_handler = None
self.profile_handler = None
self.has_lomo_optimizer = False
if kwargs_handlers is not None:
@ -373,6 +373,11 @@ class Accelerator:
raise ValueError("You can only pass one `AutocastKwargs` in `kwargs_handler`.")
else:
self.autocast_handler = handler
elif isinstance(handler, ProfileKwargs):
if self.profile_handler is not None:
raise ValueError("You can only pass one `ProfileKwargs` in `kwargs_handler`.")
else:
self.profile_handler = handler
kwargs = self.init_handler.to_kwargs() if self.init_handler is not None else {}
self.state = AcceleratorState(
@ -460,7 +465,7 @@ class Accelerator:
and self.distributed_type not in (DistributedType.DEEPSPEED, DistributedType.MEGATRON_LM)
):
self.native_amp = True
if self.device.type not in ("xpu", "cuda", "npu", "xla", "mlu") or is_torch_xla_available(
if self.device.type not in ("xpu", "cuda", "npu", "xla", "mlu", "musa") or is_torch_xla_available(
check_is_tpu=True
):
raise ValueError(f"fp16 mixed precision requires a GPU (not {self.device.type!r}).")
@ -473,6 +478,8 @@ class Accelerator:
self.scaler = xamp.GradScaler(**kwargs)
elif is_mlu_available():
self.scaler = torch.mlu.amp.GradScaler(**kwargs)
elif is_musa_available():
self.scalar = torch.musa.amp.GradScaler(**kwargs)
elif is_npu_available():
self.scaler = torch.npu.amp.GradScaler(**kwargs)
elif is_xpu_available():
@ -1117,6 +1124,7 @@ class Accelerator:
DistributedType.MULTI_GPU,
DistributedType.MULTI_NPU,
DistributedType.MULTI_MLU,
DistributedType.MULTI_MUSA,
DistributedType.MULTI_XPU,
):
dl_even_batches_values = []
@ -1377,6 +1385,10 @@ class Accelerator:
# We prepare fp8 after, allowing for bf16 autocast to happen first
if getattr(self.fp8_recipe_handler, "backend", None) == "TE":
# Import here to keep base imports fast
import transformer_engine.common.recipe as te_recipe
from transformer_engine.pytorch import fp8_autocast
if not has_transformer_engine_layers(model):
with torch.no_grad():
convert_model(model)
@ -1425,6 +1437,7 @@ class Accelerator:
if self.distributed_type in (
DistributedType.MULTI_GPU,
DistributedType.MULTI_MLU,
DistributedType.MULTI_MUSA,
DistributedType.MULTI_NPU,
DistributedType.MULTI_XPU,
):
@ -2980,6 +2993,7 @@ class Accelerator:
schedulers,
dataloaders,
self.state.process_index,
self.step,
self.scaler,
save_on_each_node=self.project_configuration.save_on_each_node,
safe_serialization=safe_serialization,
@ -3121,13 +3135,14 @@ class Accelerator:
if self.num_processes > 1 and self.distributed_type in (
DistributedType.MULTI_GPU,
DistributedType.MULTI_MLU,
DistributedType.MULTI_MUSA,
DistributedType.MULTI_NPU,
):
map_location = "on_device"
else:
map_location = "cpu"
load_accelerator_state(
override_attributes = load_accelerator_state(
input_dir,
models,
optimizers,
@ -3138,6 +3153,7 @@ class Accelerator:
map_location,
**load_model_func_kwargs,
)
self.step = override_attributes["step"]
custom_checkpoints = [
f for f in os.listdir(input_dir) if re.search(r"^custom_checkpoint_\d+\.pkl$", f) is not None
]
@ -3356,6 +3372,66 @@ class Accelerator:
yield
autocast_context.__exit__(*sys.exc_info())
@contextmanager
def profile(self, profile_handler: ProfileKwargs | None = None):
"""
Will profile the code inside the context manager. The profile will be saved to a Chrome Trace file if
`profile_handler.output_trace_dir` is set.
A different `profile_handler` can be passed in to override the one set in the `Accelerator` object.
Args:
profile_handler (`ProfileKwargs`, *optional*):
The profile handler to use for this context manager. If not passed, will use the one set in the
`Accelerator` object.
Example:
```python
# Profile with default settings
from accelerate import Accelerator
from accelerate.utils import ProfileKwargs
accelerator = Accelerator()
with accelerator.profile() as prof:
train()
accelerator.print(prof.key_averages().table())
# Profile with the custom handler
def custom_handler(prof):
print(prof.key_averages().table(sort_by="self_cpu_time_total", row_limit=10))
kwargs = ProfileKwargs(schedule_option=dict(wait=1, warmup=1, active=1), on_trace_ready=custom_handler)
accelerator = Accelerator(kwarg_handler=[kwargs])
with accelerator.profile() as prof:
for _ in range(10):
train_iteration()
prof.step()
# Profile and export to Chrome Trace
kwargs = ProfileKwargs(output_trace_dir="output_trace")
accelerator = Accelerator(kwarg_handler=[kwargs])
with accelerator.profile():
train()
```
"""
profile_handler = profile_handler or self.profile_handler or ProfileKwargs()
with profile_handler.build() as profiler:
yield profiler
if profile_handler.output_trace_dir is None:
return
os.makedirs(profile_handler.output_trace_dir, exist_ok=True)
profiler.export_chrome_trace(
os.path.join(profile_handler.output_trace_dir, PROFILE_PATTERN_NAME.format(suffix=self.process_index))
)
self.wait_for_everyone()
@property
def optimizer_step_was_skipped(self):
"""

View File

@ -38,6 +38,7 @@ from .utils import (
get_balanced_memory,
infer_auto_device_map,
is_mlu_available,
is_musa_available,
is_npu_available,
is_torch_version,
is_xpu_available,
@ -463,6 +464,8 @@ def dispatch_model(
model.npu = add_warning(model.npu, model)
elif is_mlu_available():
model.mlu = add_warning(model.mlu, model)
elif is_musa_available():
model.musa = add_warning(model.musa, model)
elif is_xpu_available():
model.xpu = add_warning(model.xpu, model)
else:
@ -483,6 +486,8 @@ def dispatch_model(
device = f"npu:{device}"
elif is_mlu_available() and isinstance(device, int):
device = f"mlu:{device}"
elif is_musa_available() and isinstance(device, int):
device = f"musa:{device}"
elif is_xpu_available() and isinstance(device, int):
device = f"xpu:{device}"
if device != "disk":

View File

@ -18,7 +18,7 @@ from typing import List
import numpy as np
import torch
from safetensors.torch import load_file
from safetensors.torch import load_model
from torch.cuda.amp import GradScaler
from .utils import (
@ -55,6 +55,7 @@ def save_accelerator_state(
schedulers: list,
dataloaders: list,
process_index: int,
step: int,
scaler: GradScaler = None,
save_on_each_node: bool = False,
safe_serialization: bool = True,
@ -82,6 +83,8 @@ def save_accelerator_state(
A list of dataloader instances to save their sampler states
process_index (`int`):
The current process index in the Accelerator state
step (`int`):
The current step in the internal step tracker
scaler (`torch.cuda.amp.GradScaler`, *optional*):
An optional gradient scaler instance to save
save_on_each_node (`bool`, *optional*):
@ -134,6 +137,7 @@ def save_accelerator_state(
# Random number generator states
states = {}
states_name = f"{RNG_STATE_NAME}_{process_index}.pkl"
states["step"] = step
states["random_state"] = random.getstate()
states["numpy_random_seed"] = np.random.get_state()
states["torch_manual_seed"] = torch.get_rng_state()
@ -180,7 +184,12 @@ def load_accelerator_state(
What device to load the optimizer state onto. Should be one of either "cpu" or "on_device".
load_model_func_kwargs (`dict`, *optional*):
Additional arguments that can be passed to the model's `load_state_dict` method.
Returns:
`dict`: Contains the `Accelerator` attributes to override while loading the state.
"""
# stores the `Accelerator` attributes to override
override_attributes = dict()
if map_location not in [None, "cpu", "on_device"]:
raise TypeError(
"Unsupported optimizer map location passed, please choose one of `None`, `'cpu'`, or `'on_device'`"
@ -196,12 +205,12 @@ def load_accelerator_state(
ending = f"_{i}" if i > 0 else ""
input_model_file = input_dir.joinpath(f"{SAFE_MODEL_NAME}{ending}.safetensors")
if input_model_file.exists():
state_dict = load_file(input_model_file, device=str(map_location))
load_model(model, input_model_file, device=str(map_location), **load_model_func_kwargs)
else:
# Load with torch
input_model_file = input_dir.joinpath(f"{MODEL_NAME}{ending}.bin")
state_dict = torch.load(input_model_file, map_location=map_location)
models[i].load_state_dict(state_dict, **load_model_func_kwargs)
model.load_state_dict(state_dict, **load_model_func_kwargs)
logger.info("All model weights loaded successfully")
# Optimizer states
@ -240,6 +249,7 @@ def load_accelerator_state(
# Random states
try:
states = torch.load(input_dir.joinpath(f"{RNG_STATE_NAME}_{process_index}.pkl"))
override_attributes["step"] = states["step"]
random.setstate(states["random_state"])
np.random.set_state(states["numpy_random_seed"])
torch.set_rng_state(states["torch_manual_seed"])
@ -253,6 +263,8 @@ def load_accelerator_state(
except Exception:
logger.info("Could not load random states")
return override_attributes
def save_custom_state(obj, path, index: int = 0, save_on_each_node: bool = False):
"""

View File

@ -22,6 +22,7 @@ from ...utils import (
is_deepspeed_available,
is_mlu_available,
is_mps_available,
is_musa_available,
is_npu_available,
is_transformers_available,
is_xpu_available,
@ -49,7 +50,16 @@ from .config_utils import (
def get_cluster_input():
distributed_type = _ask_options(
"Which type of machine are you using?",
["No distributed training", "multi-CPU", "multi-XPU", "multi-GPU", "multi-NPU", "multi-MLU", "TPU"],
[
"No distributed training",
"multi-CPU",
"multi-XPU",
"multi-GPU",
"multi-NPU",
"multi-MLU",
"multi-MUSA",
"TPU",
],
_convert_distributed_mode,
)
@ -66,6 +76,7 @@ def get_cluster_input():
if distributed_type in [
DistributedType.MULTI_GPU,
DistributedType.MULTI_MLU,
DistributedType.MULTI_MUSA,
DistributedType.MULTI_NPU,
DistributedType.MULTI_XPU,
DistributedType.MULTI_CPU,
@ -145,7 +156,13 @@ def get_cluster_input():
not use_cpu
and is_xpu_available()
and distributed_type
not in [DistributedType.MULTI_GPU, DistributedType.MULTI_NPU, DistributedType.MULTI_MLU, DistributedType.XLA]
not in [
DistributedType.MULTI_GPU,
DistributedType.MULTI_NPU,
DistributedType.MULTI_MLU,
DistributedType.XLA,
DistributedType.MULTI_MUSA,
]
):
ipex_config["use_xpu"] = _ask_field(
"Do you want to use XPU plugin to speed up training on XPU? [yes/NO]:",
@ -205,6 +222,7 @@ def get_cluster_input():
DistributedType.MULTI_XPU,
DistributedType.MULTI_NPU,
DistributedType.MULTI_MLU,
DistributedType.MULTI_MUSA,
DistributedType.NO,
]
and not use_mps
@ -358,6 +376,7 @@ def get_cluster_input():
DistributedType.MULTI_GPU,
DistributedType.MULTI_NPU,
DistributedType.MULTI_MLU,
DistributedType.MULTI_MUSA,
DistributedType.MULTI_XPU,
]:
use_fsdp = _ask_field(
@ -529,6 +548,7 @@ def get_cluster_input():
DistributedType.MULTI_XPU,
DistributedType.MULTI_GPU,
DistributedType.MULTI_MLU,
DistributedType.MULTI_MUSA,
DistributedType.MULTI_NPU,
DistributedType.XLA,
]:
@ -565,6 +585,7 @@ def get_cluster_input():
in [
DistributedType.MULTI_GPU,
DistributedType.MULTI_MLU,
DistributedType.MULTI_MUSA,
DistributedType.MULTI_NPU,
DistributedType.MULTI_XPU,
DistributedType.NO,
@ -576,6 +597,8 @@ def get_cluster_input():
machine_type = "NPU(s)"
elif is_mlu_available():
machine_type = "MLU(s)"
elif is_musa_available():
machine_type = "MUSA(s)"
else:
machine_type = "GPU(s)"
gpu_ids = _ask_field(

View File

@ -37,6 +37,8 @@ DYNAMO_BACKENDS = [
"FX2TRT",
"ONNXRT",
"TENSORRT",
"AOT_TORCHXLA_TRACE_ONCE",
"TORHCHXLA_TRACE_ONCE",
"IPEX",
"TVM",
]
@ -68,7 +70,9 @@ def _convert_compute_environment(value):
def _convert_distributed_mode(value):
value = int(value)
return DistributedType(["NO", "MULTI_CPU", "MULTI_XPU", "MULTI_GPU", "MULTI_NPU", "MULTI_MLU", "XLA"][value])
return DistributedType(
["NO", "MULTI_CPU", "MULTI_XPU", "MULTI_GPU", "MULTI_NPU", "MULTI_MLU", "MULTI_MUSA", "XLA"][value]
)
def _convert_dynamo_backend(value):

View File

@ -18,7 +18,7 @@ from pathlib import Path
import torch
from ...utils import is_mlu_available, is_npu_available, is_xpu_available
from ...utils import is_mlu_available, is_musa_available, is_npu_available, is_xpu_available
from .config_args import ClusterConfig, default_json_config_file
from .config_utils import SubcommandHelpFormatter
@ -65,6 +65,14 @@ def write_basic_config(mixed_precision="no", save_location: str = default_json_c
config["distributed_type"] = "MULTI_MLU"
else:
config["distributed_type"] = "NO"
elif is_musa_available():
num_musas = torch.musa.device_count()
config["num_processes"] = num_musas
config["use_cpu"] = False
if num_musas > 1:
config["distributed_type"] = "MULTI_MUSA"
else:
config["distributed_type"] = "NO"
elif torch.cuda.is_available():
num_gpus = torch.cuda.device_count()
config["num_processes"] = num_gpus

View File

@ -26,7 +26,7 @@ import torch
from accelerate import __version__ as version
from accelerate.commands.config import default_config_file, load_config_from_file
from ..utils import is_mlu_available, is_npu_available, is_xpu_available
from ..utils import is_mlu_available, is_musa_available, is_npu_available, is_xpu_available
def env_command_parser(subparsers=None):
@ -49,6 +49,7 @@ def env_command(args):
pt_cuda_available = torch.cuda.is_available()
pt_xpu_available = is_xpu_available()
pt_mlu_available = is_mlu_available()
pt_musa_available = is_musa_available()
pt_npu_available = is_npu_available()
accelerate_config = "Not found"
@ -75,6 +76,7 @@ def env_command(args):
"PyTorch XPU available": str(pt_xpu_available),
"PyTorch NPU available": str(pt_npu_available),
"PyTorch MLU available": str(pt_mlu_available),
"PyTorch MUSA available": str(pt_musa_available),
"System RAM": f"{psutil.virtual_memory().total / 1024 ** 3:.2f} GB",
}
if pt_cuda_available:

View File

@ -40,6 +40,7 @@ from accelerate.utils import (
is_bf16_available,
is_deepspeed_available,
is_mlu_available,
is_musa_available,
is_npu_available,
is_rich_available,
is_sagemaker_available,
@ -934,6 +935,7 @@ def _validate_launch_command(args):
DistributedType.MULTI_GPU,
DistributedType.MULTI_NPU,
DistributedType.MULTI_MLU,
DistributedType.MULTI_MUSA,
DistributedType.MULTI_XPU,
)
else False
@ -1013,6 +1015,8 @@ def _validate_launch_command(args):
args.num_processes = torch.xpu.device_count()
elif is_mlu_available():
args.num_processes = torch.mlu.device_count()
elif is_musa_available():
args.num_processes = torch.musa.device_count()
elif is_npu_available():
args.num_processes = torch.npu.device_count()
else:
@ -1020,11 +1024,16 @@ def _validate_launch_command(args):
warned.append(f"\t`--num_processes` was set to a value of `{args.num_processes}`")
if args.debug is None:
args.debug = False
if not args.multi_gpu and (
(args.use_xpu and is_xpu_available() and torch.xpu.device_count() > 1)
or (is_mlu_available() and torch.mlu.device_count() > 1)
or (is_npu_available() and torch.npu.device_count() > 1)
or (torch.cuda.device_count() > 1)
if (
not args.multi_gpu
and args.num_processes > 1
and (
(args.use_xpu and is_xpu_available() and torch.xpu.device_count() > 1)
or (is_mlu_available() and torch.mlu.device_count() > 1)
or (is_musa_available() and torch.musa.device_count() > 1)
or (is_npu_available() and torch.npu.device_count() > 1)
or (torch.cuda.device_count() > 1)
)
):
warned.append(
"\t\tMore than one GPU was found, enabling multi-GPU training.\n"

View File

@ -30,7 +30,7 @@ from .utils.modeling import get_non_persistent_buffers
from .utils.other import recursive_getattr
_accelerate_added_attributes = ["to", "cuda", "npu", "xpu", "mlu"]
_accelerate_added_attributes = ["to", "cuda", "npu", "xpu", "mlu", "musa"]
class ModelHook:

View File

@ -28,11 +28,6 @@ from .utils import (
)
if is_pippy_available():
from pippy.IR import Pipe, PipeSplitWrapper, annotate_split_points
from pippy.PipelineStage import PipelineStage
def generate_device_map(model, num_processes: int = 1, no_split_module_classes=None, max_memory: dict = None):
"""
Calculates the device map for `model` with an offset for PiPPy
@ -83,6 +78,10 @@ def build_pipeline(model, split_points, args, kwargs, num_chunks):
Users can pass in custom `num_chunks` as an optional hyper-parameter. By default will use
`AcceleratorState.num_processes`
"""
# Note: We import here to reduce import time from general modules, and isolate outside dependencies
from pippy.IR import Pipe, PipeSplitWrapper, annotate_split_points
from pippy.PipelineStage import PipelineStage
# We need to annotate the split points in the model for PiPPy
state = PartialState()
annotate_split_points(model, {split_point: PipeSplitWrapper.SplitPoint.BEGINNING for split_point in split_points})

View File

@ -26,8 +26,10 @@ from .utils import (
check_cuda_p2p_ib_support,
get_gpu_info,
is_mps_available,
is_torch_version,
patch_environment,
)
from .utils.constants import ELASTIC_LOG_LINE_PREFIX_TEMPLATE_PYTORCH_VERSION
def test_launch():
@ -50,6 +52,7 @@ def notebook_launcher(
rdzv_id="none",
max_restarts=0,
monitor_interval=0.1,
log_line_prefix_template=None,
):
"""
Launches a training function, using several processes or multiple nodes if it's possible in the current environment
@ -96,6 +99,8 @@ def notebook_launcher(
The maximum amount of restarts that elastic agent will conduct on workers before failure.
monitor_interval (`float`, *optional*, defaults to 0.1):
The interval in seconds that is used by the elastic_agent as a period of monitoring workers.
log_line_prefix_template (`str`, *optional*, defaults to `None`):
The prefix template for elastic launch logging. Available from PyTorch 2.2.0.
Example:
@ -223,7 +228,7 @@ def notebook_launcher(
rdzv_conf["rank"] = node_rank
if not rdzv_endpoint:
rdzv_endpoint = f"{master_addr}:{use_port}"
launch_config = LaunchConfig(
launch_config_kwargs = dict(
min_nodes=num_nodes,
max_nodes=num_nodes,
nproc_per_node=num_processes,
@ -234,9 +239,10 @@ def notebook_launcher(
max_restarts=max_restarts,
monitor_interval=monitor_interval,
start_method="fork",
log_line_prefix_template=os.environ.get("TORCHELASTIC_LOG_LINE_PREFIX_TEMPLATE"),
)
elastic_launch(config=launch_config, entrypoint=function)(*args)
if is_torch_version(">=", ELASTIC_LOG_LINE_PREFIX_TEMPLATE_PYTORCH_VERSION):
launch_config_kwargs["log_line_prefix_template"] = log_line_prefix_template
elastic_launch(config=LaunchConfig(**launch_config_kwargs), entrypoint=function)(*args)
except ProcessRaisedException as e:
if "Cannot re-initialize CUDA in forked subprocess" in e.args[0]:
raise RuntimeError(

View File

@ -70,6 +70,7 @@ class LocalSGD:
DistributedType.MULTI_CPU,
DistributedType.MULTI_GPU,
DistributedType.MULTI_MLU,
DistributedType.MULTI_MUSA,
DistributedType.MULTI_NPU,
]:
raise NotImplementedError("LocalSGD is supported only for CPUs and GPUs (no DeepSpeed or MegatronLM)")

View File

@ -40,6 +40,7 @@ from .utils import (
is_ipex_available,
is_mlu_available,
is_mps_available,
is_musa_available,
is_npu_available,
is_torch_xla_available,
is_xpu_available,
@ -56,6 +57,9 @@ if is_torch_xla_available():
if is_mlu_available(check_device=False):
import torch_mlu # noqa: F401
if is_musa_available(check_device=False):
import torch_musa # noqa: F401
if is_npu_available(check_device=False):
import torch_npu # noqa: F401
@ -195,11 +199,6 @@ class PartialState:
)
from deepspeed import comm as dist
if is_xpu_available() and is_ccl_available():
os.environ["CCL_PROCESS_LAUNCHER"] = "none"
os.environ["CCL_LOCAL_SIZE"] = os.environ.get("LOCAL_WORLD_SIZE", "1")
os.environ["CCL_LOCAL_RANK"] = os.environ.get("LOCAL_RANK", "0")
if not dist.is_initialized():
dist.init_distributed(dist_backend=self.backend, auto_mpi_discovery=False, **kwargs)
# We need to flag to `use_deepspeed` to be True to override `distributed_type` later
@ -217,10 +216,6 @@ class PartialState:
os.environ["WORLD_SIZE"] = str(dist_information.world_size)
os.environ["LOCAL_RANK"] = str(dist_information.local_rank)
os.environ["LOCAL_WORLD_SIZE"] = str(dist_information.local_world_size)
if self.backend == "ccl" and self.distributed_type == DistributedType.MULTI_XPU:
os.environ["CCL_PROCESS_LAUNCHER"] = "none"
os.environ["CCL_LOCAL_SIZE"] = os.environ["LOCAL_WORLD_SIZE"]
os.environ["CCL_LOCAL_RANK"] = os.environ["LOCAL_RANK"]
if not os.environ.get("MASTER_PORT", None):
os.environ["MASTER_PORT"] = "29500"
if (
@ -369,6 +364,7 @@ class PartialState:
if self.distributed_type in (
DistributedType.MULTI_GPU,
DistributedType.MULTI_MLU,
DistributedType.MULTI_MUSA,
DistributedType.MULTI_NPU,
DistributedType.MULTI_XPU,
DistributedType.MULTI_CPU,
@ -688,6 +684,7 @@ class PartialState:
- MPS if `torch.backends.mps.is_available()` and `torch.backends.mps.is_built()` both return True.
- CUDA if `torch.cuda.is_available()`
- MLU if `is_mlu_available()`
- MUSA if `is_musa_available()`
- NPU if `is_npu_available()`
- CPU otherwise
"""
@ -696,6 +693,8 @@ class PartialState:
return torch.device("mps")
elif is_mlu_available():
return torch.device("mlu")
elif is_musa_available():
return torch.device("musa")
elif torch.cuda.is_available():
return torch.device("cuda")
elif is_xpu_available():
@ -722,6 +721,9 @@ class PartialState:
if is_mlu_available():
backend = "cncl"
distributed_type = DistributedType.MULTI_MLU
elif is_musa_available():
backend = "mccl"
distributed_type = DistributedType.MULTI_MUSA
elif torch.cuda.is_available():
if backend is None:
backend = "nccl"
@ -769,7 +771,7 @@ class PartialState:
self.device = torch.device("cpu") if self._cpu else self.default_device
return
device = str(self.distributed_type).split(".")[-1].replace("MULTI_", "").lower()
if device not in ("cpu", "gpu", "mlu", "npu", "xpu", "xla"):
if device not in ("cpu", "gpu", "mlu", "musa", "npu", "xpu", "xla"):
raise ValueError(
f"Can't set device for {self.distributed_type} ({device}), verify we should be calling `_set_device()` for it!"
)
@ -778,16 +780,10 @@ class PartialState:
else:
if device == "gpu":
device = "cuda"
self.device = torch.device(device, self.local_process_index)
if self.device is not None:
if device == "xpu":
torch.xpu.set_device(self.device)
elif device == "mlu":
torch.mlu.set_device(self.device)
elif device == "npu":
torch.npu.set_device(self.device)
elif device == "cuda":
torch.cuda.set_device(self.device)
device_module = getattr(torch, device)
device_index = self.local_process_index % device_module.device_count()
self.device = torch.device(device, device_index)
device_module.set_device(self.device)
def __getattr__(self, name: str):
# By this point we know that no attributes of `self` contain `name`,
@ -894,6 +890,7 @@ class AcceleratorState:
elif self.distributed_type in [
DistributedType.MULTI_GPU,
DistributedType.MULTI_MLU,
DistributedType.MULTI_MUSA,
DistributedType.MULTI_NPU,
DistributedType.MULTI_XPU,
]:
@ -920,6 +917,12 @@ class AcceleratorState:
and self.device.type == "cuda"
):
torch.backends.cuda.matmul.allow_tf32 = True
if (
self.dynamo_plugin.backend != DynamoBackend.NO
and self._mixed_precision == "no"
and self.device.type == "musa"
):
torch.backends.musa.matmul.allow_tf32 = True
PartialState._shared_state["distributed_type"] = self.distributed_type
@property

View File

@ -29,6 +29,7 @@ from .testing import (
require_multi_device,
require_multi_gpu,
require_multi_xpu,
require_musa,
require_non_cpu,
require_non_torch_xla,
require_non_xpu,

View File

@ -23,7 +23,7 @@ from torch.utils.data import DataLoader
from transformers import AutoModelForSequenceClassification, AutoTokenizer, get_linear_schedule_with_warmup, set_seed
from accelerate import Accelerator, DistributedType
from accelerate.utils import is_mlu_available, is_npu_available, is_xpu_available
from accelerate.utils import is_mlu_available, is_musa_available, is_npu_available, is_xpu_available
from accelerate.utils.deepspeed import DummyOptim, DummyScheduler
@ -48,6 +48,10 @@ class TorchTracemalloc:
torch.mlu.empty_cache()
torch.mlu.reset_max_memory_allocated() # reset the peak gauge to zero
self.begin = torch.mlu.memory_allocated()
elif is_musa_available():
torch.musa.empty_cache()
torch.musa.reset_max_memory_allocated() # reset the peak gauge to zero
self.begin = torch.musa.memory_allocated()
elif is_npu_available():
torch.npu.empty_cache()
torch.npu.reset_max_memory_allocated() # reset the peak gauge to zero
@ -68,6 +72,10 @@ class TorchTracemalloc:
torch.mlu.empty_cache()
torch.mlu.memory_allocated() # reset the peak gauge to zero
self.begin = torch.mlu.max_memory_allocated()
elif is_musa_available():
torch.musa.empty_cache()
torch.musa.memory_allocated() # reset the peak gauge to zero
self.begin = torch.musa.max_memory_allocated()
elif is_npu_available():
torch.npu.empty_cache()
self.end = torch.npu.memory_allocated()

View File

@ -37,6 +37,7 @@ from accelerate.utils import (
is_datasets_available,
is_ipex_available,
is_mlu_available,
is_musa_available,
is_npu_available,
is_pytest_available,
is_xpu_available,
@ -474,7 +475,7 @@ def training_check(use_seedable_sampler=False):
accelerator.print("Training yielded the same results on one CPU or distributes setup with batch split.")
if torch.cuda.is_available() or is_npu_available() or is_mlu_available():
if torch.cuda.is_available() or is_npu_available() or is_mlu_available() or is_musa_available():
# Mostly a test that FP16 doesn't crash as the operation inside the model is not converted to FP16
print("FP16 training check.")
AcceleratorState._reset_state()

View File

@ -343,6 +343,7 @@ def main():
DistributedType.MULTI_GPU,
DistributedType.MULTI_NPU,
DistributedType.MULTI_MLU,
DistributedType.MULTI_MUSA,
DistributedType.MULTI_CPU,
):
if state.local_process_index == 0:
@ -351,7 +352,12 @@ def main():
if state.local_process_index == 0:
print("**Test Distributed `no_sync` context manager with multiple forwards**")
test_distributed_sync_multiple_fwd(accelerator)
if state.distributed_type in (DistributedType.MULTI_GPU, DistributedType.MULTI_NPU, DistributedType.MULTI_MLU):
if state.distributed_type in (
DistributedType.MULTI_GPU,
DistributedType.MULTI_NPU,
DistributedType.MULTI_MLU,
DistributedType.MULTI_MUSA,
):
for split_batch in [True, False]:
for dispatch_batches in [True, False]:
for sync_each_batch in [True, False]:
@ -369,7 +375,12 @@ def main():
"`split_batches=False`, `dispatch_batches=False`, `sync_each_batch=False`**",
)
test_gradient_accumulation_with_opt_and_scheduler()
if state.distributed_type in (DistributedType.MULTI_GPU, DistributedType.MULTI_NPU, DistributedType.MULTI_MLU):
if state.distributed_type in (
DistributedType.MULTI_GPU,
DistributedType.MULTI_NPU,
DistributedType.MULTI_MLU,
DistributedType.MULTI_MUSA,
):
for split_batch in [True, False]:
for dispatch_batches in [True, False]:
for sync_each_batch in [True, False]:

View File

@ -40,8 +40,10 @@ from ..utils import (
is_datasets_available,
is_deepspeed_available,
is_dvclive_available,
is_import_timer_available,
is_mlu_available,
is_mps_available,
is_musa_available,
is_npu_available,
is_pandas_available,
is_pippy_available,
@ -52,6 +54,7 @@ from ..utils import (
is_torch_xla_available,
is_torchvision_available,
is_transformers_available,
is_triton_available,
is_wandb_available,
is_xpu_available,
str_to_bool,
@ -64,17 +67,19 @@ def get_backend():
elif is_cuda_available():
return "cuda", torch.cuda.device_count(), torch.cuda.memory_allocated
elif is_mps_available(min_version="2.0"):
return "mps", 1, torch.mps.current_allocated_memory()
return "mps", 1, torch.mps.current_allocated_memory
elif is_mps_available():
return "mps", 1, 0
return "mps", 1, lambda: 0
elif is_mlu_available():
return "mlu", torch.mlu.device_count(), torch.mlu.memory_allocated
elif is_musa_available():
return "musa", torch.musa.device_count(), torch.musa.memory_allocated
elif is_npu_available():
return "npu", torch.npu.device_count(), torch.npu.memory_allocated
elif is_xpu_available():
return "xpu", torch.xpu.device_count(), torch.xpu.memory_allocated
else:
return "cpu", 1, 0
return "cpu", 1, lambda: 0
torch_device, device_count, memory_allocated_func = get_backend()
@ -178,6 +183,13 @@ def require_mlu(test_case):
return unittest.skipUnless(is_mlu_available(), "test require a MLU")(test_case)
def require_musa(test_case):
"""
Decorator marking a test that requires MUSA. These tests are skipped when there are no MUSA available.
"""
return unittest.skipUnless(is_musa_available(), "test require a MUSA")(test_case)
def require_npu(test_case):
"""
Decorator marking a test that requires NPU. These tests are skipped when there are no NPU available.
@ -212,7 +224,7 @@ def require_transformers(test_case):
def require_timm(test_case):
"""
Decorator marking a test that requires transformers. These tests are skipped when they are not.
Decorator marking a test that requires timm. These tests are skipped when they are not.
"""
return unittest.skipUnless(is_timm_available(), "test requires the timm library")(test_case)
@ -224,6 +236,13 @@ def require_torchvision(test_case):
return unittest.skipUnless(is_torchvision_available(), "test requires the torchvision library")(test_case)
def require_triton(test_case):
"""
Decorator marking a test that requires triton. These tests are skipped when they are not.
"""
return unittest.skipUnless(is_triton_available(), "test requires the triton library")(test_case)
def require_schedulefree(test_case):
"""
Decorator marking a test that requires schedulefree. These tests are skipped when they are not.
@ -377,6 +396,14 @@ def require_pippy(test_case):
return unittest.skipUnless(is_pippy_available(), "test requires pippy")(test_case)
def require_import_timer(test_case):
"""
Decorator marking a test that requires tuna interpreter installed. These tests are skipped when tuna isn't
installed
"""
return unittest.skipUnless(is_import_timer_available(), "test requires tuna interpreter")(test_case)
_atleast_one_tracker_available = (
any([is_wandb_available(), is_tensorboard_available()]) and not is_comet_ml_available()
)

View File

@ -14,6 +14,7 @@
from .constants import (
MODEL_NAME,
OPTIMIZER_NAME,
PROFILE_PATTERN_NAME,
RNG_STATE_NAME,
SAFE_MODEL_NAME,
SAFE_WEIGHTS_INDEX_NAME,
@ -48,6 +49,7 @@ from .dataclasses import (
LoggerType,
MegatronLMPlugin,
PrecisionType,
ProfileKwargs,
ProjectConfiguration,
RNGType,
SageMakerDistributedType,
@ -84,6 +86,7 @@ from .imports import (
is_deepspeed_available,
is_dvclive_available,
is_fp8_available,
is_import_timer_available,
is_ipex_available,
is_lomo_available,
is_megatron_lm_available,
@ -91,6 +94,7 @@ from .imports import (
is_mlu_available,
is_mps_available,
is_msamp_available,
is_musa_available,
is_npu_available,
is_pandas_available,
is_peft_available,
@ -106,6 +110,7 @@ from .imports import (
is_torchvision_available,
is_transformer_engine_available,
is_transformers_available,
is_triton_available,
is_wandb_available,
is_xpu_available,
)
@ -193,24 +198,31 @@ from .launch import (
prepare_simple_launcher_cmd_env,
prepare_tpu,
)
# For docs
from .megatron_lm import (
AbstractTrainStep,
BertTrainStep,
GPTTrainStep,
MegatronEngine,
MegatronLMDummyDataLoader,
MegatronLMDummyScheduler,
MegatronLMOptimizerWrapper,
MegatronLMSchedulerWrapper,
T5TrainStep,
avg_losses_across_data_parallel_group,
gather_across_data_parallel_groups,
)
from .megatron_lm import initialize as megatron_lm_initialize
from .megatron_lm import prepare_data_loader as megatron_lm_prepare_data_loader
from .megatron_lm import prepare_model_optimizer_scheduler as megatron_lm_prepare_model_optimizer_scheduler
from .megatron_lm import prepare_optimizer as megatron_lm_prepare_optimizer
from .megatron_lm import prepare_scheduler as megatron_lm_prepare_scheduler
if is_megatron_lm_available():
from .megatron_lm import (
MegatronEngine,
MegatronLMOptimizerWrapper,
MegatronLMSchedulerWrapper,
gather_across_data_parallel_groups,
)
from .megatron_lm import initialize as megatron_lm_initialize
from .megatron_lm import prepare_data_loader as megatron_lm_prepare_data_loader
from .megatron_lm import prepare_model_optimizer_scheduler as megatron_lm_prepare_model_optimizer_scheduler
from .megatron_lm import prepare_optimizer as megatron_lm_prepare_optimizer
from .megatron_lm import prepare_scheduler as megatron_lm_prepare_scheduler
from .memory import find_executable_batch_size, release_memory
from .other import (
check_os_kernel,

View File

@ -22,6 +22,7 @@ RNG_STATE_NAME = "random_states"
OPTIMIZER_NAME = "optimizer"
SCHEDULER_NAME = "scheduler"
SAMPLER_NAME = "sampler"
PROFILE_PATTERN_NAME = "profile_{suffix}.json"
WEIGHTS_NAME = f"{MODEL_NAME}.bin"
WEIGHTS_PATTERN_NAME = "pytorch_model{suffix}.bin"
WEIGHTS_INDEX_NAME = f"{WEIGHTS_NAME}.index.json"
@ -40,6 +41,7 @@ FSDP_PYTORCH_VERSION = "2.1.0"
FSDP_MODEL_NAME = "pytorch_model_fsdp"
DEEPSPEED_MULTINODE_LAUNCHERS = ["pdsh", "standard", "openmpi", "mvapich", "mpich"]
TORCH_DYNAMO_MODES = ["default", "reduce-overhead", "max-autotune"]
ELASTIC_LOG_LINE_PREFIX_TEMPLATE_PYTORCH_VERSION = "2.2.0"
STR_OPERATION_TO_FUNC = {">": op.gt, ">=": op.ge, "==": op.eq, "!=": op.ne, "<=": op.le, "<": op.lt}
@ -71,4 +73,10 @@ TORCH_LAUNCH_PARAMS = [
]
CUDA_DISTRIBUTED_TYPES = ["DEEPSPEED", "MULTI_GPU", "FSDP", "MEGATRON_LM"]
TORCH_DISTRIBUTED_OPERATION_TYPES = CUDA_DISTRIBUTED_TYPES + ["MULTI_NPU", "MULTI_MLU", "MULTI_XPU", "MULTI_CPU"]
TORCH_DISTRIBUTED_OPERATION_TYPES = CUDA_DISTRIBUTED_TYPES + [
"MULTI_NPU",
"MULTI_MLU",
"MULTI_MUSA",
"MULTI_XPU",
"MULTI_CPU",
]

View File

@ -350,6 +350,117 @@ class FP8RecipeKwargs(KwargsHandler):
raise ValueError(f"`optimization_level` must be one of {' or '.join(get_args(OptLevel))}")
# Literal
ProfilerActivity = Literal["cpu", "xpu", "mtia", "cuda"]
@dataclass
class ProfileKwargs(KwargsHandler):
"""
Use this object in your [`Accelerator`] to customize the initialization of the profiler. Please refer to the
documentation of this [context manager](https://pytorch.org/docs/stable/profiler.html#torch.profiler.profile) for
more information on each argument.
<Tip warning={true}>
`torch.profiler` is only available in PyTorch 1.8.1 and later versions.
</Tip>
Example:
```python
from accelerate import Accelerator
from accelerate.utils import ProfileKwargs
kwargs = ProfileKwargs(activities=["cpu", "cuda"])
accelerator = Accelerator(kwargs_handlers=[kwargs])
```
Args:
activities (`List[str]`, *optional*, default to `None`):
The list of activity groups to use in profiling. Must be one of `"cpu"`, `"xpu"`, `"mtia"`, or `"cuda"`.
schedule_option (`Dict[str, int]`, *optional*, default to `None`):
The schedule option to use for the profiler. Available keys are `wait`, `warmup`, `active`, `repeat` and
`skip_first`. The profiler will skip the first `skip_first` steps, then wait for `wait` steps, then do the
warmup for the next `warmup` steps, then do the active recording for the next `active` steps and then
repeat the cycle starting with `wait` steps. The optional number of cycles is specified with the `repeat`
parameter, the zero value means that the cycles will continue until the profiling is finished.
on_trace_ready (`Callable`, *optional*, default to `None`):
Callable that is called at each step when schedule returns `ProfilerAction.RECORD_AND_SAVE` during the
profiling.
record_shapes (`bool`, *optional*, default to `False`):
Save information about operators input shapes.
profile_memory (`bool`, *optional*, default to `False`):
Track tensor memory allocation/deallocation
with_stack (`bool`, *optional*, default to `False`):
Record source information (file and line number) for the ops.
with_flops (`bool`, *optional*, default to `False`):
Use formula to estimate the FLOPS of specific operators
with_modules (`bool`, *optional*, default to `False`):
Record module hierarchy (including function names) corresponding to the callstack of the op.
output_trace_dir (`str`, *optional*, default to `None`):
Exports the collected trace in Chrome JSON format. Chrome use 'chrome://tracing' view json file. Defaults
to None, which means profiling does not store json files.
"""
activities: Optional[List[ProfilerActivity]] = None
schedule_option: Optional[Dict[str, int]] = None
on_trace_ready: Optional[Callable] = None
record_shapes: bool = False
profile_memory: bool = False
with_stack: bool = False
with_flops: bool = False
with_modules: bool = False
output_trace_dir: Optional[str] = None
def _get_profiler_activity(self, activity: ProfilerActivity) -> torch.profiler.ProfilerActivity:
"""Get the profiler activity from the string.
Args:
activity (str): The profiler activity name.
Returns:
torch.profiler.ProfilerActivity: The profiler activity.
"""
profiler_activity_map: dict[str, torch.profiler.ProfilerActivity] = {
"cpu": torch.profiler.ProfilerActivity.CPU,
"xpu": torch.profiler.ProfilerActivity.XPU,
"mita": torch.profiler.ProfilerActivity.MTIA,
"cuda": torch.profiler.ProfilerActivity.CUDA,
}
if activity not in profiler_activity_map:
raise ValueError(f"Invalid profiler activity: {activity}. Must be one of {list(profiler_activity_map)}.")
return profiler_activity_map[activity]
def build(self) -> torch.profiler.profile:
"""
Build a profiler object with the current configuration.
Returns:
torch.profiler.profile: The profiler object.
"""
activities: Optional[List[ProfilerActivity]] = None
if self.activities is not None:
activities = [self._get_profiler_activity(activity) for activity in self.activities]
schedule: Optional[torch.profiler.schedule] = None
if self.schedule_option is not None:
schedule = torch.profiler.schedule(**self.schedule_option)
return torch.profiler.profile(
activities=activities,
schedule=schedule,
on_trace_ready=self.on_trace_ready,
record_shapes=self.record_shapes,
profile_memory=self.profile_memory,
with_stack=self.with_stack,
with_flops=self.with_flops,
with_modules=self.with_modules,
)
class DeprecatedFieldDescriptor:
"""
Descriptor for deprecated fields in an enum class.
@ -384,6 +495,7 @@ class DistributedType(str, enum.Enum):
- **MULTI_CPU** -- Distributed on multiple CPU nodes.
- **MULTI_GPU** -- Distributed on multiple GPUs.
- **MULTI_MLU** -- Distributed on multiple MLUs.
- **MULTI_MUSA** -- Distributed on multiple MUSAs.
- **MULTI_NPU** -- Distributed on multiple NPUs.
- **MULTI_XPU** -- Distributed on multiple XPUs.
- **DEEPSPEED** -- Using DeepSpeed.
@ -397,6 +509,7 @@ class DistributedType(str, enum.Enum):
MULTI_GPU = "MULTI_GPU"
MULTI_NPU = "MULTI_NPU"
MULTI_MLU = "MULTI_MLU"
MULTI_MUSA = "MULTI_MUSA"
MULTI_XPU = "MULTI_XPU"
DEEPSPEED = "DEEPSPEED"
FSDP = "FSDP"
@ -463,6 +576,10 @@ class DynamoBackend(str, BaseEnum):
- **ONNXRT** -- Uses ONNXRT for inference on CPU/GPU. Inference only. [Read more](https://onnxruntime.ai/)
- **TENSORRT** -- Uses ONNXRT to run TensorRT for inference optimizations. [Read
more](https://github.com/onnx/onnx-tensorrt)
- **AOT_TORCHXLA_TRACE_ONCE** -- Uses Pytorch/XLA with TorchDynamo optimization, for training. [Read
more](https://github.com/pytorch/xla/blob/r2.0/docs/dynamo.md)
- **TORCHXLA_TRACE_ONCE** -- Uses Pytorch/XLA with TorchDynamo optimization, for inference. [Read
more](https://github.com/pytorch/xla/blob/r2.0/docs/dynamo.md)
- **IPEX** -- Uses IPEX for inference on CPU. Inference only. [Read
more](https://github.com/intel/intel-extension-for-pytorch).
- **TVM** -- Uses Apach TVM for inference optimizations. [Read more](https://tvm.apache.org/)
@ -481,6 +598,8 @@ class DynamoBackend(str, BaseEnum):
FX2TRT = "FX2TRT"
ONNXRT = "ONNXRT"
TENSORRT = "TENSORRT"
AOT_TORCHXLA_TRACE_ONCE = "AOT_TORCHXLA_TRACE_ONCE"
TORCHXLA_TRACE_ONCE = "TORCHXLA_TRACE_ONCE"
IPEX = "IPEX"
TVM = "TVM"
@ -527,6 +646,7 @@ class RNGType(BaseEnum):
TORCH = "torch"
CUDA = "cuda"
MLU = "mlu"
MUSA = "musa"
NPU = "npu"
XLA = "xla"
XPU = "xpu"

View File

@ -18,24 +18,12 @@ from pathlib import Path
import torch
from ..logging import get_logger
from .constants import FSDP_MODEL_NAME, FSDP_PYTORCH_VERSION, OPTIMIZER_NAME, SAFE_WEIGHTS_NAME, WEIGHTS_NAME
from .imports import is_torch_distributed_available
from .constants import FSDP_MODEL_NAME, OPTIMIZER_NAME, SAFE_WEIGHTS_NAME, WEIGHTS_NAME
from .modeling import is_peft_model
from .other import save
from .versions import is_torch_version
if is_torch_version(">=", FSDP_PYTORCH_VERSION) and is_torch_distributed_available():
import torch.distributed.checkpoint as dist_cp
from torch.distributed.checkpoint.default_planner import DefaultLoadPlanner, DefaultSavePlanner
from torch.distributed.checkpoint.optimizer import load_sharded_optimizer_state_dict
from torch.distributed.fsdp.fully_sharded_data_parallel import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp.fully_sharded_data_parallel import StateDictType
# `dist_cp_format_utils is only available from pt>=2.3.0
if is_torch_version(">=", "2.3.0") and is_torch_distributed_available():
import torch.distributed.checkpoint.format_utils as dist_cp_format_utils
logger = get_logger(__name__)
@ -58,8 +46,13 @@ def _set_model_state_dict(model, state_dict, adapter_only=False):
def save_fsdp_model(fsdp_plugin, accelerator, model, output_dir, model_index=0, adapter_only=False):
os.makedirs(output_dir, exist_ok=True)
# Note: We import here to reduce import time from general modules, and isolate outside dependencies
import torch.distributed.checkpoint as dist_cp
from torch.distributed.checkpoint.default_planner import DefaultSavePlanner
from torch.distributed.fsdp.fully_sharded_data_parallel import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp.fully_sharded_data_parallel import StateDictType
os.makedirs(output_dir, exist_ok=True)
if fsdp_plugin.state_dict_type == StateDictType.FULL_STATE_DICT:
# FSDP raises error when single GPU is used with `offload_to_cpu=True` for FULL_STATE_DICT
# so, only enable it when num_processes>1
@ -103,6 +96,12 @@ def save_fsdp_model(fsdp_plugin, accelerator, model, output_dir, model_index=0,
def load_fsdp_model(fsdp_plugin, accelerator, model, input_dir, model_index=0, adapter_only=False):
# Note: We import here to reduce import time from general modules, and isolate outside dependencies
import torch.distributed.checkpoint as dist_cp
from torch.distributed.checkpoint.default_planner import DefaultLoadPlanner
from torch.distributed.fsdp.fully_sharded_data_parallel import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp.fully_sharded_data_parallel import StateDictType
accelerator.wait_for_everyone()
if fsdp_plugin.state_dict_type == StateDictType.FULL_STATE_DICT:
# FSDP raises error when single GPU is used with `offload_to_cpu=True` for FULL_STATE_DICT
@ -156,6 +155,12 @@ def load_fsdp_model(fsdp_plugin, accelerator, model, input_dir, model_index=0, a
def save_fsdp_optimizer(fsdp_plugin, accelerator, optimizer, model, output_dir, optimizer_index=0):
# Note: We import here to reduce import time from general modules, and isolate outside dependencies
import torch.distributed.checkpoint as dist_cp
from torch.distributed.checkpoint.default_planner import DefaultSavePlanner
from torch.distributed.fsdp.fully_sharded_data_parallel import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp.fully_sharded_data_parallel import StateDictType
os.makedirs(output_dir, exist_ok=True)
with FSDP.state_dict_type(
model, fsdp_plugin.state_dict_type, fsdp_plugin.state_dict_config, fsdp_plugin.optim_state_dict_config
@ -183,6 +188,12 @@ def save_fsdp_optimizer(fsdp_plugin, accelerator, optimizer, model, output_dir,
def load_fsdp_optimizer(fsdp_plugin, accelerator, optimizer, model, input_dir, optimizer_index=0, adapter_only=False):
# Note: We import here to reduce import time from general modules, and isolate outside dependencies
import torch.distributed.checkpoint as dist_cp
from torch.distributed.checkpoint.optimizer import load_sharded_optimizer_state_dict
from torch.distributed.fsdp.fully_sharded_data_parallel import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp.fully_sharded_data_parallel import StateDictType
accelerator.wait_for_everyone()
with FSDP.state_dict_type(
model, fsdp_plugin.state_dict_type, fsdp_plugin.state_dict_config, fsdp_plugin.optim_state_dict_config
@ -221,6 +232,10 @@ def _distributed_checkpoint_to_merged_weights(checkpoint_dir: str, save_path: st
Will save under `save_path` as either `model.safetensors` or `pytorch_model.bin`.
"""
# Note: We import here to reduce import time from general modules, and isolate outside dependencies
import torch.distributed.checkpoint as dist_cp
import torch.distributed.checkpoint.format_utils as dist_cp_format_utils
state_dict = {}
save_path = Path(save_path)
save_path.mkdir(exist_ok=True)
@ -259,11 +274,29 @@ def merge_fsdp_weights(
remove_checkpoint_dir (`bool`, *optional*, defaults to `False`):
Whether to remove the checkpoint directory after merging.
"""
checkpoint_dir = Path(checkpoint_dir)
from accelerate.state import PartialState
if not is_torch_version(">=", "2.3.0"):
raise ValueError("`merge_fsdp_weights` requires PyTorch >= 2.3.0`")
# Verify that the checkpoint directory exists
if not checkpoint_dir.exists():
model_path_exists = (checkpoint_dir / "pytorch_model_fsdp_0").exists()
optimizer_path_exists = (checkpoint_dir / "optimizer_0").exists()
err = f"Tried to load from {checkpoint_dir} but couldn't find a valid metadata file."
if model_path_exists and optimizer_path_exists:
err += " However, potential model and optimizer checkpoint directories exist."
err += f"Please pass in either {checkpoint_dir}/pytorch_model_fsdp_0 or {checkpoint_dir}/optimizer_0"
err += "instead."
elif model_path_exists:
err += " However, a potential model checkpoint directory exists."
err += f"Please try passing in {checkpoint_dir}/pytorch_model_fsdp_0 instead."
elif optimizer_path_exists:
err += " However, a potential optimizer checkpoint directory exists."
err += f"Please try passing in {checkpoint_dir}/optimizer_0 instead."
raise ValueError(err)
# To setup `save` to work
state = PartialState()
if state.is_main_process:

View File

@ -81,8 +81,12 @@ def get_ccl_version():
return importlib.metadata.version("oneccl_bind_pt")
def is_import_timer_available():
return _is_package_available("import_timer")
def is_pynvml_available():
return _is_package_available("pynvml")
return _is_package_available("pynvml") or _is_package_available("pynvml", "nvidia-ml-py")
def is_pytest_available():
@ -244,6 +248,10 @@ def is_timm_available():
return _is_package_available("timm")
def is_triton_available():
return _is_package_available("triton")
def is_aim_available():
package_exists = _is_package_available("aim")
if package_exists:
@ -355,6 +363,24 @@ def is_mlu_available(check_device=False):
return hasattr(torch, "mlu") and torch.mlu.is_available()
@lru_cache
def is_musa_available(check_device=False):
"Checks if `torch_musa` is installed and potentially if a MUSA is in the environment"
if importlib.util.find_spec("torch_musa") is None:
return False
import torch_musa # noqa: F401
if check_device:
try:
# Will raise a RuntimeError if no MUSA is found
_ = torch.musa.device_count()
return torch.musa.is_available()
except RuntimeError:
return False
return hasattr(torch, "musa") and torch.musa.is_available()
@lru_cache
def is_npu_available(check_device=False):
"Checks if `torch_npu` is installed and potentially if a NPU is in the environment"

View File

@ -29,6 +29,7 @@ from ..utils import (
PrecisionType,
is_ipex_available,
is_mlu_available,
is_musa_available,
is_npu_available,
is_torch_xla_available,
is_xpu_available,
@ -67,10 +68,10 @@ def _get_mpirun_args():
mpirun_version = subprocess.check_output([mpi_app, "--version"])
if b"Open MPI" in mpirun_version:
return mpi_app, "--hostfile", "-n", "--npernode"
return mpi_app, "--hostfile", "-n", "--npernode", "--bind-to"
else:
# Intel MPI and MVAPICH both use the same arg names
return mpi_app, "-f", "-n", "-ppn"
return mpi_app, "-f", "-n", "-ppn", ""
def prepare_simple_launcher_cmd_env(args: argparse.Namespace) -> Tuple[List[str], Dict[str, str]]:
@ -82,14 +83,23 @@ def prepare_simple_launcher_cmd_env(args: argparse.Namespace) -> Tuple[List[str]
raise ValueError("--module and --no_python cannot be used together")
if args.mpirun_hostfile is not None:
mpi_app_name, hostfile_arg, num_proc_arg, proc_per_node_arg = _get_mpirun_args()
mpi_app_name, hostfile_arg, num_proc_arg, proc_per_node_arg, bind_to_arg = _get_mpirun_args()
mpirun_ccl = getattr(args, "mpirun_ccl", None)
bind_to = getattr(args, "bind-to", "socket")
num_machines = args.num_machines
num_processes = getattr(args, "num_processes", None)
nproc_per_node = str(num_processes // num_machines) if num_processes and num_machines else "1"
cmd += [mpi_app_name, hostfile_arg, args.mpirun_hostfile, proc_per_node_arg, nproc_per_node]
cmd += [
mpi_app_name,
hostfile_arg,
args.mpirun_hostfile,
proc_per_node_arg,
nproc_per_node,
]
if num_processes:
cmd += [num_proc_arg, str(num_processes)]
if bind_to_arg:
cmd += [bind_to_arg, bind_to]
if not args.no_python:
cmd.append(sys.executable)
if args.module:
@ -106,6 +116,8 @@ def prepare_simple_launcher_cmd_env(args: argparse.Namespace) -> Tuple[List[str]
current_env["ZE_AFFINITY_MASK"] = args.gpu_ids
elif is_mlu_available():
current_env["MLU_VISIBLE_DEVICES"] = args.gpu_ids
elif is_musa_available():
current_env["MUSA_VISIBLE_DEVICES"] = args.gpu_ids
elif is_npu_available():
current_env["ASCEND_RT_VISIBLE_DEVICES"] = args.gpu_ids
else:
@ -115,7 +127,7 @@ def prepare_simple_launcher_cmd_env(args: argparse.Namespace) -> Tuple[List[str]
current_env["MASTER_PORT"] = str(args.main_process_port)
if args.mpirun_hostfile is not None:
current_env["CCL_WORKER_COUNT"] = mpirun_ccl
current_env["CCL_WORKER_COUNT"] = str(mpirun_ccl)
elif args.num_processes > 1:
current_env["MASTER_ADDR"] = args.main_process_ip if args.main_process_ip is not None else "127.0.0.1"
current_env["MASTER_PORT"] = str(args.main_process_port) if args.main_process_port is not None else "29500"
@ -200,6 +212,8 @@ def prepare_multi_gpu_env(args: argparse.Namespace) -> Dict[str, str]:
current_env["ZE_AFFINITY_MASK"] = gpu_ids
elif is_mlu_available():
current_env["MLU_VISIBLE_DEVICES"] = gpu_ids
elif is_musa_available():
current_env["MUSA_VISIBLE_DEVICES"] = gpu_ids
elif is_npu_available():
current_env["ASCEND_RT_VISIBLE_DEVICES"] = gpu_ids
else:
@ -361,6 +375,8 @@ def prepare_deepspeed_cmd_env(args: argparse.Namespace) -> Tuple[List[str], Dict
current_env["ZE_AFFINITY_MASK"] = gpu_ids
elif is_mlu_available():
current_env["MLU_VISIBLE_DEVICES"] = gpu_ids
elif is_musa_available():
current_env["MUSA_VISIBLE_DEVICES"] = gpu_ids
elif is_npu_available():
current_env["ASCEND_RT_VISIBLE_DEVICES"] = gpu_ids
else:
@ -613,6 +629,7 @@ class PrepareForLaunch:
elif self.distributed_type in (
DistributedType.MULTI_GPU,
DistributedType.MULTI_MLU,
DistributedType.MULTI_MUSA,
DistributedType.MULTI_NPU,
DistributedType.MULTI_XPU,
DistributedType.MULTI_CPU,

View File

@ -25,18 +25,10 @@ from torch.nn.parallel.distributed import DistributedDataParallel as torchDDP
from ..optimizer import AcceleratedOptimizer
from ..scheduler import AcceleratedScheduler
from .imports import is_megatron_lm_available, is_transformers_available
from .imports import is_megatron_lm_available
from .operations import recursively_apply, send_to_device
if is_transformers_available():
from transformers.modeling_outputs import (
CausalLMOutputWithCrossAttentions,
Seq2SeqLMOutput,
SequenceClassifierOutput,
)
if is_megatron_lm_available():
from megatron import (
get_args,
@ -467,6 +459,8 @@ class BertTrainStep(AbstractTrainStep):
if not args.model_return_dict:
self.model_output_class = None
else:
from transformers.modeling_outputs import SequenceClassifierOutput
self.model_output_class = SequenceClassifierOutput
def get_batch_func(self, accelerator, megatron_dataset_flag):
@ -614,6 +608,8 @@ class GPTTrainStep(AbstractTrainStep):
if not args.model_return_dict:
self.model_output_class = None
else:
from transformers.modeling_outputs import CausalLMOutputWithCrossAttentions
self.model_output_class = CausalLMOutputWithCrossAttentions
def get_batch_func(self, accelerator, megatron_dataset_flag):
@ -737,6 +733,8 @@ class T5TrainStep(AbstractTrainStep):
if not args.model_return_dict:
self.model_output_class = None
else:
from transformers.modeling_outputs import Seq2SeqLMOutput
self.model_output_class = Seq2SeqLMOutput
@staticmethod

View File

@ -23,7 +23,29 @@ import inspect
import torch
from .imports import is_mlu_available, is_mps_available, is_npu_available, is_xpu_available
from .imports import is_mlu_available, is_mps_available, is_musa_available, is_npu_available, is_xpu_available
def clear_device_cache(garbage_collection=False):
"""
Clears the device cache by calling `torch.{backend}.empty_cache`. Can also run `gc.collect()`, but do note that
this is a *considerable* slowdown and should be used sparingly.
"""
if garbage_collection:
gc.collect()
if is_xpu_available():
torch.xpu.empty_cache()
elif is_mlu_available():
torch.mlu.empty_cache()
elif is_musa_available():
torch.musa.empty_cache()
elif is_npu_available():
torch.npu.empty_cache()
elif is_mps_available(min_version="2.0"):
torch.mps.empty_cache()
else:
torch.cuda.empty_cache()
def release_memory(*objects):
@ -52,17 +74,7 @@ def release_memory(*objects):
objects = list(objects)
for i in range(len(objects)):
objects[i] = None
gc.collect()
if is_xpu_available():
torch.xpu.empty_cache()
elif is_mlu_available():
torch.mlu.empty_cache()
elif is_npu_available():
torch.npu.empty_cache()
elif is_mps_available(min_version="2.0"):
torch.mps.empty_cache()
else:
torch.cuda.empty_cache()
clear_device_cache(garbage_collection=True)
return objects
@ -118,15 +130,7 @@ def find_executable_batch_size(function: callable = None, starting_batch_size: i
def decorator(*args, **kwargs):
nonlocal batch_size
gc.collect()
if is_xpu_available():
torch.xpu.empty_cache()
elif is_mlu_available():
torch.mlu.empty_cache()
elif is_npu_available():
torch.npu.empty_cache()
else:
torch.cuda.empty_cache()
clear_device_cache(garbage_collection=True)
params = list(inspect.signature(function).parameters.keys())
# Guard against user error
if len(params) < (len(args) + 1):
@ -142,15 +146,7 @@ def find_executable_batch_size(function: callable = None, starting_batch_size: i
return function(batch_size, *args, **kwargs)
except Exception as e:
if should_reduce_batch_size(e):
gc.collect()
if is_xpu_available():
torch.xpu.empty_cache()
elif is_mlu_available():
torch.mlu.empty_cache()
elif is_npu_available():
torch.npu.empty_cache()
else:
torch.cuda.empty_cache()
clear_device_cache(garbage_collection=True)
batch_size //= 2
else:
raise

View File

@ -14,7 +14,6 @@
import contextlib
import gc
import importlib
import inspect
import json
import logging
@ -26,7 +25,6 @@ import warnings
from collections import OrderedDict, defaultdict
from typing import Dict, List, Optional, Tuple, Union
import packaging
import torch
import torch.nn as nn
@ -36,14 +34,16 @@ from .dataclasses import AutocastKwargs, CustomDtype, DistributedType
from .imports import (
is_mlu_available,
is_mps_available,
is_musa_available,
is_npu_available,
is_peft_available,
is_torch_xla_available,
is_xpu_available,
)
from .memory import clear_device_cache
from .offload import load_offloaded_weight, offload_weight, save_offload_index
from .tqdm import is_tqdm_available, tqdm
from .versions import compare_versions
from .versions import compare_versions, is_torch_version
if is_npu_available(check_device=False):
@ -52,6 +52,9 @@ if is_npu_available(check_device=False):
if is_mlu_available(check_device=False):
import torch_mlu # noqa: F401
if is_musa_available(check_device=False):
import torch_musa # noqa: F401
from safetensors import safe_open
from safetensors.torch import load_file as safe_load_file
@ -160,6 +163,8 @@ def dtype_byte_size(dtype: torch.dtype):
return 1 / 2
elif dtype == CustomDtype.FP8:
return 1
elif is_torch_version(">=", "2.1.0") and dtype == torch.float8_e4m3fn:
return 1
bit_search = re.search(r"[^\d](\d+)$", str(dtype))
if bit_search is None:
raise ValueError(f"`dtype` is not a valid dtype: {dtype}.")
@ -358,10 +363,15 @@ def set_module_tensor_to_device(
if old_value.device == torch.device("meta") and device not in ["meta", torch.device("meta")] and value is None:
raise ValueError(f"{tensor_name} is on the meta device, we need a `value` to put in on {device}.")
param = module._parameters[tensor_name] if tensor_name in module._parameters else None
param_cls = type(param)
if value is not None:
if old_value.shape != value.shape:
# We can expect mismatches when using bnb 4bit since Params4bit will reshape and pack the weights.
# In other cases, we want to make sure we're not loading checkpoints that do not match the config.
if old_value.shape != value.shape and param_cls.__name__ != "Params4bit":
raise ValueError(
f'Trying to set a tensor of shape {value.shape} in "{tensor_name}" (which has shape {old_value.shape}), this look incorrect.'
f'Trying to set a tensor of shape {value.shape} in "{tensor_name}" (which has shape {old_value.shape}), this looks incorrect.'
)
if dtype is None:
@ -370,9 +380,6 @@ def set_module_tensor_to_device(
elif not str(value.dtype).startswith(("torch.uint", "torch.int", "torch.bool")):
value = value.to(dtype)
param = module._parameters[tensor_name] if tensor_name in module._parameters else None
param_cls = type(param)
device_quantization = None
with torch.no_grad():
# leave it on cpu first before moving them to cuda
@ -391,8 +398,12 @@ def set_module_tensor_to_device(
device = f"npu:{device}"
elif is_mlu_available():
device = f"mlu:{device}"
elif is_musa_available():
device = f"musa:{device}"
elif is_xpu_available():
device = f"xpu:{device}"
if "xpu" in str(device) and not is_xpu_available():
raise ValueError(f'{device} is not available, you should use device="cpu" instead')
if value is None:
new_value = old_value.to(device)
if dtype is not None and device in ["meta", torch.device("meta")]:
@ -412,7 +423,7 @@ def set_module_tensor_to_device(
elif value is not None or not check_device_same(torch.device(device), module._parameters[tensor_name].device):
param_cls = type(module._parameters[tensor_name])
kwargs = module._parameters[tensor_name].__dict__
if param_cls.__name__ in ["Int8Params", "FP4Params"]:
if param_cls.__name__ in ["Int8Params", "FP4Params", "Params4bit"]:
if param_cls.__name__ == "Int8Params" and new_value.dtype == torch.float32:
# downcast to fp16 if any - needed for 8bit serialization
new_value = new_value.to(torch.float16)
@ -458,14 +469,7 @@ def set_module_tensor_to_device(
module.weight = module.weight.cuda(device_index)
# clean pre and post foward hook
if device != "cpu":
if is_npu_available():
torch.npu.empty_cache()
elif is_mlu_available():
torch.mlu.empty_cache()
elif is_xpu_available():
torch.xpu.empty_cache()
else:
torch.cuda.empty_cache()
clear_device_cache()
# When handling tied weights, we update tied_params_map to keep track of the tied weights that have already been allocated on the device in
# order to avoid duplicating memory, see above.
@ -830,6 +834,14 @@ def get_max_memory(max_memory: Optional[Dict[Union[int, str], Union[int, str]]]
except Exception:
logger.info(f"Device {i} seems unavailable, Proceeding to check subsequent devices.")
continue
elif is_musa_available():
for i in range(torch.musa.device_count()):
try:
_ = torch.tensor(0, device=torch.device("musa", i))
max_memory[i] = torch.musa.mem_get_info(i)[0]
except Exception:
logger.info(f"Device {i} seems unavailable, Proceeding to check subsequent devices.")
continue
elif is_xpu_available():
for i in range(torch.xpu.device_count()):
try:
@ -866,6 +878,8 @@ def get_max_memory(max_memory: Optional[Dict[Union[int, str], Union[int, str]]]
num_devices = torch.npu.device_count()
elif is_mlu_available():
num_devices = torch.mlu.device_count()
elif is_musa_available():
num_devices = torch.musa.device_count()
elif is_xpu_available():
num_devices = torch.xpu.device_count()
else:
@ -993,6 +1007,8 @@ def get_balanced_memory(
expected_device_type = "npu"
elif is_mlu_available():
expected_device_type = "mlu"
elif is_musa_available():
expected_device_type = "musa"
elif is_xpu_available():
expected_device_type = "xpu"
else:
@ -1456,7 +1472,15 @@ def load_state_dict(checkpoint_file, device_map=None):
else:
# if we only have one device we can load everything directly
if len(set(device_map.values())) == 1:
return safe_load_file(checkpoint_file, device=list(device_map.values())[0])
device = list(device_map.values())[0]
target_device = device
if is_xpu_available():
if compare_versions("safetensors", "<", "0.4.2"):
raise ImportError("Safetensors version must be >= 0.4.2 for XPU. Please upgrade safetensors.")
if isinstance(device, int):
target_device = f"xpu:{device}"
return safe_load_file(checkpoint_file, device=target_device)
devices = list(set(device_map.values()) - {"disk"})
# cpu device should always exist as fallback option
@ -1486,15 +1510,9 @@ def load_state_dict(checkpoint_file, device_map=None):
progress_bar = None
for device in devices:
target_device = device
if is_xpu_available():
current_safetensors_version = packaging.version.parse(importlib.metadata.version("safetensors"))
if compare_versions(current_safetensors_version, "<", "0.4.2"):
raise ModuleNotFoundError(
f"You need at least safetensors 0.4.2 for Intel GPU, while you have {current_safetensors_version}"
)
if compare_versions("safetensors", "<", "0.4.2"):
raise ImportError("Safetensors version must be >= 0.4.2 for XPU. Please upgrade safetensors.")
if isinstance(device, int):
target_device = f"xpu:{device}"
@ -1857,6 +1875,7 @@ def get_mixed_precision_context_manager(native_amp: bool = False, autocast_kwarg
DistributedType.MULTI_CPU,
DistributedType.MULTI_GPU,
DistributedType.MULTI_MLU,
DistributedType.MULTI_MUSA,
DistributedType.MULTI_NPU,
DistributedType.MULTI_XPU,
DistributedType.FSDP,

View File

@ -151,9 +151,6 @@ def send_to_device(tensor, device, non_blocking=False, skip_keys=None):
device = "npu:0"
if device == "xpu":
device = "xpu:0"
# TODO: torch_mlu LongTensor.to(<int num>) has bugs, we will fix this later.
if is_torch_tensor(tensor) and tensor.device.type in ["mlu"] and tensor.dtype in [torch.int64]:
tensor = tensor.cpu()
try:
return tensor.to(device, non_blocking=non_blocking)
except TypeError: # .to() doesn't accept non_blocking as kwarg

View File

@ -21,7 +21,7 @@ import torch
from ..state import AcceleratorState
from .constants import CUDA_DISTRIBUTED_TYPES
from .dataclasses import DistributedType, RNGType
from .imports import is_mlu_available, is_npu_available, is_torch_xla_available, is_xpu_available
from .imports import is_mlu_available, is_musa_available, is_npu_available, is_torch_xla_available, is_xpu_available
if is_torch_xla_available():
@ -51,6 +51,8 @@ def set_seed(seed: int, device_specific: bool = False, deterministic: bool = Fal
torch.npu.manual_seed_all(seed)
elif is_mlu_available():
torch.mlu.manual_seed_all(seed)
elif is_musa_available():
torch.musa.manual_seed_all(seed)
else:
torch.cuda.manual_seed_all(seed)
# ^^ safe to call this function even if cuda is not available
@ -76,6 +78,9 @@ def synchronize_rng_state(rng_type: Optional[RNGType] = None, generator: Optiona
elif rng_type == RNGType.MLU:
assert is_mlu_available(), "Can't synchronize MLU seeds on an environment without MLUs."
rng_state = torch.mlu.get_rng_state()
elif rng_type == RNGType.MUSA:
assert is_musa_available(), "Can't synchronize MUSA seeds on an environment without MUSAs."
rng_state = torch.musa.get_rng_state()
elif rng_type == RNGType.XPU:
assert is_xpu_available(), "Can't synchronize XPU seeds on an environment without XPUs."
rng_state = torch.xpu.get_rng_state()
@ -93,6 +98,7 @@ def synchronize_rng_state(rng_type: Optional[RNGType] = None, generator: Optiona
elif (
state.distributed_type in CUDA_DISTRIBUTED_TYPES
or state.distributed_type == DistributedType.MULTI_MLU
or state.distributed_type == DistributedType.MULTI_MUSA
or state.distributed_type == DistributedType.MULTI_NPU
or state.distributed_type == DistributedType.MULTI_XPU
):
@ -111,6 +117,8 @@ def synchronize_rng_state(rng_type: Optional[RNGType] = None, generator: Optiona
torch.npu.set_rng_state(rng_state)
elif rng_type == RNGType.MLU:
torch.mlu.set_rng_state(rng_state)
elif rng_type == RNGType.MUSA:
torch.musa.set_rng_state(rng_state)
elif rng_type == RNGType.XPU:
torch.xpu.set_rng_state(rng_state)
elif rng_type == RNGType.XLA:

View File

@ -15,6 +15,7 @@ import json
import os
import pickle
import tempfile
import time
from unittest.mock import patch
import psutil
@ -32,8 +33,20 @@ from accelerate.utils import patch_environment
from accelerate.utils.modeling import get_state_dict_from_offload, load_checkpoint_in_model
def create_components():
model = torch.nn.Linear(2, 4)
class ModelWithTiedWeights(torch.nn.Module):
def __init__(self):
super().__init__()
self.linear1 = torch.nn.Linear(2, 4)
self.linear2 = torch.nn.Linear(4, 2)
self.linear2.weight = self.linear1.weight
self.linear2.bias = self.linear1.bias
def forward(self, x):
return self.linear2(self.linear1(x))
def create_components(tied_weights=False):
model = ModelWithTiedWeights() if tied_weights else torch.nn.Linear(2, 4)
optimizer = torch.optim.AdamW(model.parameters(), lr=1.0)
scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.01, steps_per_epoch=2, epochs=1)
train_dl = DataLoader(TensorDataset(torch.tensor([1, 2, 3])))
@ -54,11 +67,14 @@ class ModelForTest(torch.nn.Module):
def get_signature(model):
return (model.weight.abs().sum() + model.bias.abs().sum()).item()
return sum(param.abs().sum().item() for param in model.parameters())
def load_random_weights(model):
state = torch.nn.Linear(*tuple(model.weight.T.shape)).state_dict()
if isinstance(model, torch.nn.Linear):
state = torch.nn.Linear(*tuple(model.weight.T.shape)).state_dict()
elif isinstance(model, ModelWithTiedWeights):
state = ModelWithTiedWeights().state_dict()
model.load_state_dict(state)
@ -66,6 +82,7 @@ def parameterized_custom_name_func(func, param_num, param):
# customize the test name generator function as we want both params to appear in the sub-test
# name, as by default it shows only the first param
param_based_name = "use_safetensors" if param.args[0] is True else "use_pytorch"
param_based_name += "_tied_weights" if (len(param.args) == 2 and param.args[1] is True) else ""
return f"{func.__name__}_{param_based_name}"
@ -204,6 +221,10 @@ class AcceleratorTester(AccelerateTestCase):
model, optimizer, scheduler, train_dl, valid_dl = accelerator.prepare(
model, optimizer, scheduler, train_dl, valid_dl
)
# Short sleep here makes this test more reliable
time.sleep(1e-3)
model, optimizer, scheduler, train_dl, valid_dl = accelerator.free_memory(
model, optimizer, scheduler, train_dl, valid_dl
)
@ -230,10 +251,10 @@ class AcceleratorTester(AccelerateTestCase):
accelerator = Accelerator()
assert str(accelerator.state.device) == "cuda:64"
@parameterized.expand((True, False), name_func=parameterized_custom_name_func)
def test_save_load_model(self, use_safetensors):
@parameterized.expand([(True, True), (True, False), (False, False)], name_func=parameterized_custom_name_func)
def test_save_load_model(self, use_safetensors, tied_weights):
accelerator = Accelerator()
model, optimizer, scheduler, train_dl, valid_dl = create_components()
model, optimizer, scheduler, train_dl, valid_dl = create_components(tied_weights)
accelerator.prepare(model, optimizer, scheduler, train_dl, valid_dl)
model_signature = get_signature(model)
@ -298,7 +319,7 @@ class AcceleratorTester(AccelerateTestCase):
assert torch.allclose(expected, output, atol=1e-5)
@parameterized.expand([True, False], name_func=parameterized_custom_name_func)
@require_cuda
@require_non_cpu
def test_get_state_dict_from_offload(self, use_safetensors):
accelerator = Accelerator()
@ -312,18 +333,18 @@ class AcceleratorTester(AccelerateTestCase):
cpu_onloaded_layer = get_state_dict_from_offload(
model.linear2, "linear2.weight", {"linear2.weight": ""}, device_to_put_offload="cpu"
)
cuda_onloaded_layer = get_state_dict_from_offload(
device_onloaded_layer = get_state_dict_from_offload(
model.linear2, "linear2.weight", {"linear2.weight": ""}, device_to_put_offload=0
)
cpu_onloaded_layer_weight = cpu_onloaded_layer["linear2.weight"]
cuda_onloaded_layer_weight = cuda_onloaded_layer["linear2.weight"]
device_onloaded_layer_weight = device_onloaded_layer["linear2.weight"]
assert torch.allclose(offloaded_layer_weight, cpu_onloaded_layer_weight)
assert torch.allclose(
offloaded_layer_weight, cuda_onloaded_layer_weight.to("cpu")
offloaded_layer_weight, device_onloaded_layer_weight.to("cpu")
) # must be on the same device for torch.allclose()
assert cpu_onloaded_layer_weight.device.type == "cpu"
assert cuda_onloaded_layer_weight.device.type == "cuda"
assert device_onloaded_layer_weight.device.type == torch_device
@parameterized.expand([True, False], name_func=parameterized_custom_name_func)
def test_save_load_model_with_hooks(self, use_safetensors):

View File

@ -655,7 +655,7 @@ class BigModelingTester(unittest.TestCase):
with self.assertRaises(RuntimeError):
model.to(0)
@require_multi_gpu
@require_multi_device
def test_dispatch_model_move_model_warning(self):
model = ModelForTest()
device_map = {"linear1": 0, "batchnorm": 0, "linear2": 1}
@ -664,7 +664,7 @@ class BigModelingTester(unittest.TestCase):
with self.assertLogs("accelerate.big_modeling", level="WARNING"):
model.to("cpu")
with self.assertLogs("accelerate.big_modeling", level="WARNING"):
model.cuda(0)
model.to(torch_device)
with self.assertRaises(RuntimeError):
x = torch.randn(2, 3)
model(x)

View File

@ -28,6 +28,7 @@ from accelerate.test_utils.testing import (
TempDirTestCase,
get_launch_command,
require_huggingface_suite,
require_multi_device,
require_multi_gpu,
require_pippy,
require_schedulefree,
@ -55,6 +56,7 @@ EXCLUDE_EXAMPLES = [
"megatron_lm_gpt_pretraining.py",
"early_stopping.py",
"ddp_comm_hook.py",
"profiler.py",
]
@ -248,17 +250,21 @@ class FeatureExamplesTests(TempDirTestCase):
testargs = ["examples/by_feature/early_stopping.py"]
run_command(self.launch_args + testargs)
@require_multi_gpu
def test_profiler(self):
testargs = ["examples/by_feature/profiler.py"]
run_command(self.launch_args + testargs)
@require_multi_device
def test_ddp_comm_hook(self):
testargs = ["examples/by_feature/ddp_comm_hook.py", "--ddp_comm_hook", "fp16"]
run_command(self.launch_args + testargs)
@require_multi_gpu
@require_multi_device
def test_distributed_inference_examples_stable_diffusion(self):
testargs = ["examples/inference/distributed/stable_diffusion.py"]
run_command(self.launch_args + testargs)
@require_multi_gpu
@require_multi_device
def test_distributed_inference_examples_phi2(self):
testargs = ["examples/inference/distributed/phi2.py"]
run_command(self.launch_args + testargs)

83
tests/test_imports.py Normal file
View File

@ -0,0 +1,83 @@
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import subprocess
from accelerate.test_utils.testing import TempDirTestCase, require_import_timer
from accelerate.utils import is_import_timer_available
if is_import_timer_available():
from import_timer import calculate_total_time, read_import_profile
from import_timer.core import get_paths_above_threshold, sort_nodes_by_total_time
def convert_list_to_string(data):
end_result = ""
arrow_right = "->"
for path in data:
end_result += f"{arrow_right.join(path[0])} {path[1]:.3f}s\n"
return end_result
def run_import_time(command: str):
output = subprocess.run(["python3", "-X", "importtime", "-c", command], capture_output=True, text=True)
return output.stderr
@require_import_timer
class ImportSpeedTester(TempDirTestCase):
"""
Test suite which checks if imports have seen slowdowns
based on a particular baseline.
If the error messages are not clear enough to get a
full view of what is slowing things down (or to
figure out how deep the initial depth should be),
please view the profile with the `tuna` framework:
`tuna import.log`.
"""
clear_on_setup = False
@classmethod
def setUpClass(cls):
super().setUpClass()
output = run_import_time("import torch")
data = read_import_profile(output)
total_time = calculate_total_time(data)
cls.pytorch_time = total_time
def test_base_import(self):
output = run_import_time("import accelerate")
data = read_import_profile(output)
total_time = calculate_total_time(data)
pct_more = (total_time - self.pytorch_time) / self.pytorch_time * 100
# Base import should never be more than 20% slower than raw torch import
err_msg = f"Base import is more than 20% slower than raw torch import ({pct_more:.2f}%), please check the attached `tuna` profile:\n"
sorted_data = sort_nodes_by_total_time(data)
paths_above_threshold = get_paths_above_threshold(sorted_data, 0.05, max_depth=7)
err_msg += f"\n{convert_list_to_string(paths_above_threshold)}"
self.assertLess(pct_more, 20, err_msg)
def test_cli_import(self):
output = run_import_time("from accelerate.commands.launch import launch_command_parser")
data = read_import_profile(output)
total_time = calculate_total_time(data)
pct_more = (total_time - self.pytorch_time) / self.pytorch_time * 100
# Base import should never be more than 20% slower than raw torch import
err_msg = f"Base import is more than 20% slower than raw torch import ({pct_more:.2f}%), please check the attached `tuna` profile:\n"
sorted_data = sort_nodes_by_total_time(data)
paths_above_threshold = get_paths_above_threshold(sorted_data, 0.05, max_depth=7)
err_msg += f"\n{convert_list_to_string(paths_above_threshold)}"
self.assertLess(pct_more, 20, err_msg)

View File

@ -29,7 +29,8 @@ from accelerate.test_utils import (
require_non_cpu,
require_non_xpu,
)
from accelerate.utils import AutocastKwargs, KwargsHandler, TorchDynamoPlugin, clear_environment
from accelerate.test_utils.testing import slow
from accelerate.utils import AutocastKwargs, KwargsHandler, ProfileKwargs, TorchDynamoPlugin, clear_environment
from accelerate.utils.dataclasses import DistributedType
@ -96,6 +97,52 @@ class KwargsHandlerTester(unittest.TestCase):
# We should be back in fp16
assert g_float16.dtype == torch.float16
@slow
def test_profile_kwargs(self):
# Arrange
schedule_options = [
dict(wait=1, warmup=1, active=2, repeat=1),
dict(wait=2, warmup=2, active=2, repeat=2),
dict(wait=0, warmup=1, active=3, repeat=3, skip_first=1),
dict(wait=3, warmup=2, active=1, repeat=1, skip_first=2),
dict(wait=1, warmup=0, active=1, repeat=5),
]
total_steps = 100
for option in schedule_options:
count = 0
table_outputs = []
steps_per_cycle = option["wait"] + option["warmup"] + option["active"]
effective_steps = max(0, total_steps - option.get("skip_first", 0))
cycles = effective_steps // steps_per_cycle
if option["repeat"] > 0:
expected_count = min(cycles, option["repeat"])
else:
expected_count = cycles
def on_trace_ready(prof):
nonlocal count
nonlocal table_outputs
count += 1
table_outputs.append(prof.key_averages().table(sort_by="cpu_time_total", row_limit=-1))
kwargs = ProfileKwargs(activities=["cpu"], on_trace_ready=on_trace_ready, schedule_option=option)
accelerator = Accelerator(kwargs_handlers=[kwargs])
# Act
with accelerator.profile() as prof:
for _ in range(total_steps):
prof.step()
torch.tensor([1, 2, 3, 4, 5], device=accelerator.device)
# Assert
assert isinstance(prof, torch.profiler.profile)
assert count == expected_count, f"Option: {option}, Expected count: {expected_count}, but got {count}"
for output in table_outputs:
self.assertIn("CPU time total:", output)
def test_torch_dynamo_plugin(self):
with clear_environment():
prefix = "ACCELERATE_DYNAMO_"

View File

@ -26,7 +26,13 @@ from parameterized import parameterized
from safetensors.torch import save_file
from accelerate import init_empty_weights
from accelerate.test_utils import require_cuda, require_huggingface_suite, require_multi_gpu
from accelerate.test_utils import (
require_cuda,
require_huggingface_suite,
require_multi_device,
require_non_cpu,
torch_device,
)
from accelerate.utils.modeling import (
check_device_map,
clean_device_map,
@ -44,6 +50,9 @@ from accelerate.utils.modeling import (
)
torch_device = f"{torch_device}:0" if torch_device != "cpu" else "cpu"
class ModelForTest(nn.Module):
def __init__(self):
super().__init__()
@ -150,20 +159,20 @@ class ModelingUtilsTester(unittest.TestCase):
model = ModelForTest()
self.check_set_module_tensor_for_device(model, "cpu", "meta")
@require_cuda
@require_non_cpu
def test_set_module_tensor_to_cpu_and_gpu(self):
model = ModelForTest()
self.check_set_module_tensor_for_device(model, "cpu", 0)
self.check_set_module_tensor_for_device(model, "cpu", torch_device)
@require_cuda
@require_non_cpu
def test_set_module_tensor_to_meta_and_gpu(self):
model = ModelForTest().to(0)
self.check_set_module_tensor_for_device(model, 0, "meta")
model = ModelForTest().to(torch_device)
self.check_set_module_tensor_for_device(model, torch_device, "meta")
@require_multi_gpu
@require_multi_device
def test_set_module_tensor_between_gpus(self):
model = ModelForTest().to(0)
self.check_set_module_tensor_for_device(model, 0, 1)
model = ModelForTest().to(torch_device)
self.check_set_module_tensor_for_device(model, torch_device, torch_device.replace("0", "1"))
def test_set_module_tensor_sets_dtype(self):
model = ModelForTest()
@ -177,7 +186,7 @@ class ModelingUtilsTester(unittest.TestCase):
set_module_tensor_to_device(model, "linear1.weight", "cpu", value=tensor)
assert (
str(cm.exception)
== 'Trying to set a tensor of shape torch.Size([2, 2]) in "weight" (which has shape torch.Size([4, 3])), this look incorrect.'
== 'Trying to set a tensor of shape torch.Size([2, 2]) in "weight" (which has shape torch.Size([4, 3])), this looks incorrect.'
)
def test_named_tensors(self):
@ -361,7 +370,7 @@ class ModelingUtilsTester(unittest.TestCase):
self.shard_test_model(model, tmp_dir)
load_checkpoint_in_model(model, tmp_dir)
@require_cuda
@require_non_cpu
def test_load_checkpoint_in_model_one_gpu(self):
device_map = {"linear1": 0, "batchnorm": "cpu", "linear2": "cpu"}
@ -371,7 +380,7 @@ class ModelingUtilsTester(unittest.TestCase):
fname = os.path.join(tmp_dir, "pt_model.bin")
torch.save(model.state_dict(), fname)
load_checkpoint_in_model(model, fname, device_map=device_map)
assert model.linear1.weight.device == torch.device(0)
assert model.linear1.weight.device == torch.device(torch_device)
assert model.batchnorm.weight.device == torch.device("cpu")
assert model.linear2.weight.device == torch.device("cpu")
@ -382,7 +391,7 @@ class ModelingUtilsTester(unittest.TestCase):
index_file = os.path.join(tmp_dir, "weight_map.index.json")
load_checkpoint_in_model(model, index_file, device_map=device_map)
assert model.linear1.weight.device == torch.device(0)
assert model.linear1.weight.device == torch.device(torch_device)
assert model.batchnorm.weight.device == torch.device("cpu")
assert model.linear2.weight.device == torch.device("cpu")
@ -392,11 +401,11 @@ class ModelingUtilsTester(unittest.TestCase):
self.shard_test_model(model, tmp_dir)
load_checkpoint_in_model(model, tmp_dir, device_map=device_map)
assert model.linear1.weight.device == torch.device(0)
assert model.linear1.weight.device == torch.device(torch_device)
assert model.batchnorm.weight.device == torch.device("cpu")
assert model.linear2.weight.device == torch.device("cpu")
@require_cuda
@require_non_cpu
def test_load_checkpoint_in_model_disk_offload(self):
device_map = {"linear1": "cpu", "batchnorm": "disk", "linear2": "cpu"}
@ -421,7 +430,7 @@ class ModelingUtilsTester(unittest.TestCase):
assert model.batchnorm.running_mean.device == torch.device("meta")
assert model.linear2.weight.device == torch.device("cpu")
@require_multi_gpu
@require_multi_device
def test_load_checkpoint_in_model_two_gpu(self):
device_map = {"linear1": 0, "batchnorm": "cpu", "linear2": 1}
@ -431,9 +440,9 @@ class ModelingUtilsTester(unittest.TestCase):
fname = os.path.join(tmp_dir, "pt_model.bin")
torch.save(model.state_dict(), fname)
load_checkpoint_in_model(model, fname, device_map=device_map)
assert model.linear1.weight.device == torch.device(0)
assert model.linear1.weight.device == torch.device(torch_device)
assert model.batchnorm.weight.device == torch.device("cpu")
assert model.linear2.weight.device == torch.device(1)
assert model.linear2.weight.device == torch.device(torch_device.replace("0", "1"))
# Check with sharded index
model = ModelForTest()
@ -442,9 +451,9 @@ class ModelingUtilsTester(unittest.TestCase):
index_file = os.path.join(tmp_dir, "weight_map.index.json")
load_checkpoint_in_model(model, index_file, device_map=device_map)
assert model.linear1.weight.device == torch.device(0)
assert model.linear1.weight.device == torch.device(torch_device)
assert model.batchnorm.weight.device == torch.device("cpu")
assert model.linear2.weight.device == torch.device(1)
assert model.linear2.weight.device == torch.device(torch_device.replace("0", "1"))
# Check with sharded checkpoint
model = ModelForTest()
@ -452,9 +461,9 @@ class ModelingUtilsTester(unittest.TestCase):
self.shard_test_model(model, tmp_dir)
load_checkpoint_in_model(model, tmp_dir, device_map=device_map)
assert model.linear1.weight.device == torch.device(0)
assert model.linear1.weight.device == torch.device(torch_device)
assert model.batchnorm.weight.device == torch.device("cpu")
assert model.linear2.weight.device == torch.device(1)
assert model.linear2.weight.device == torch.device(torch_device.replace("0", "1"))
def test_load_checkpoint_in_model_dtype(self):
with tempfile.NamedTemporaryFile(suffix=".pt") as tmpfile:
@ -725,7 +734,7 @@ class ModelingUtilsTester(unittest.TestCase):
max_memory = get_balanced_memory(model, max_memory={0: 0, "cpu": 100})
assert {0: 0, "cpu": 100} == max_memory
@require_cuda
@require_non_cpu
def test_load_state_dict(self):
state_dict = {k: torch.randn(4, 5) for k in ["a", "b", "c"]}
device_maps = [{"a": "cpu", "b": 0, "c": "disk"}, {"a": 0, "b": 0, "c": "disk"}, {"a": 0, "b": 0, "c": 0}]

View File

@ -32,6 +32,7 @@ from accelerate.test_utils import (
require_non_torch_xla,
require_pippy,
require_torchvision,
torch_device,
)
from accelerate.utils import patch_environment
@ -72,7 +73,7 @@ class MultiDeviceTester(unittest.TestCase):
execute_subprocess_async(cmd)
@require_non_torch_xla
@require_multi_gpu
@require_multi_device
def test_distributed_data_loop(self):
"""
This TestCase checks the behaviour that occurs during distributed training or evaluation,
@ -80,7 +81,16 @@ class MultiDeviceTester(unittest.TestCase):
"""
print(f"Found {device_count} devices, using 2 devices only")
cmd = get_launch_command(num_processes=2) + [self.data_loop_file_path]
with patch_environment(omp_num_threads=1, cuda_visible_devices="0,1"):
env_kwargs = dict(omp_num_threads=1)
if torch_device == "xpu":
env_kwargs.update(ze_affinity_mask="0,1")
elif torch_device == "npu":
env_kwargs.update(ascend_rt_visible_devices="0,1")
elif torch_device == "mlu":
env_kwargs.update(mlu_visible_devices="0,1")
else:
env_kwargs.update(cuda_visible_devices="0,1")
with patch_environment(**env_kwargs):
execute_subprocess_async(cmd)
@require_multi_gpu

View File

@ -27,12 +27,12 @@ from torch import nn
from accelerate.state import PartialState
from accelerate.test_utils.testing import (
require_cuda,
require_huggingface_suite,
require_non_cpu,
require_non_torch_xla,
require_torch_min_version,
require_tpu,
require_triton,
torch_device,
)
from accelerate.test_utils.training import RegressionModel
@ -190,15 +190,16 @@ class UtilsTester(unittest.TestCase):
model = extract_model_from_parallel(model, keep_fp32_wrapper=False)
_ = pickle.dumps(model)
@require_cuda
@require_triton
@require_non_cpu
@require_torch_min_version(version="2.0")
def test_dynamo(self):
model = RegressionModel()
model._original_forward = model.forward
model.forward = torch.cuda.amp.autocast(dtype=torch.float16)(model.forward)
model.forward = torch.autocast(device_type=torch_device, dtype=torch.float16)(model.forward)
model.forward = convert_outputs_to_fp32(model.forward)
model.forward = torch.compile(model.forward, backend="inductor")
inputs = torch.randn(4, 10).cuda()
inputs = torch.randn(4, 10).to(torch_device)
_ = model(inputs)
def test_extract_model(self):