mirror of
https://github.com/huggingface/accelerate.git
synced 2025-11-17 16:04:35 +08:00
Compare commits
31 Commits
cp-dataloa
...
v1.9.0
| Author | SHA1 | Date | |
|---|---|---|---|
| 5565412840 | |||
| 12f89bb754 | |||
| 348aabaaaf | |||
| 3b13453bbf | |||
| 0408ab12d7 | |||
| 55e518a762 | |||
| 7e11ac43f0 | |||
| e2cc537db8 | |||
| 847ae58c74 | |||
| 6e104f31de | |||
| 524e5f9828 | |||
| d6c986c3f2 | |||
| 1ac8643df7 | |||
| 07ce74868c | |||
| 175fe91589 | |||
| fe16ce8bce | |||
| 5987d79a53 | |||
| 31af8d4e8e | |||
| b7493a82b1 | |||
| a16d2bb3c1 | |||
| cac22ed980 | |||
| be826a6b7b | |||
| 5939640829 | |||
| 7f9c8cbe34 | |||
| 9888c7ed23 | |||
| 42a68c30dc | |||
| 6597dae780 | |||
| 8878d93745 | |||
| 2eaf5cdbbc | |||
| 23c1d8db89 | |||
| 0af621bbec |
17
.github/workflows/gaudi3_scheduled.yml
vendored
17
.github/workflows/gaudi3_scheduled.yml
vendored
@ -15,7 +15,7 @@ jobs:
|
||||
group: itac-bm-emr-gaudi3-dell-2gaudi
|
||||
|
||||
container:
|
||||
image: docker://vault.habana.ai/gaudi-docker/1.20.0/ubuntu22.04/habanalabs/pytorch-installer-2.6.0:latest
|
||||
image: docker://vault.habana.ai/gaudi-docker/1.21.1/ubuntu22.04/habanalabs/pytorch-installer-2.6.0:latest
|
||||
options: --runtime=habana --shm-size=64G --cap-add=sys_nice --env HABANA_VISIBLE_DEVICES
|
||||
env:
|
||||
OMPI_MCA_btl_vader_single_copy_mechanism: none
|
||||
@ -66,16 +66,21 @@ jobs:
|
||||
run: |
|
||||
make test_big_modeling
|
||||
|
||||
- name: Run FSDP integration tests
|
||||
if: ${{ !cancelled() && (success() || failure()) }}
|
||||
run: |
|
||||
make test_fsdp
|
||||
|
||||
- name: Run DeepSpeed integration tests
|
||||
if: ${{ !cancelled() && (success() || failure()) }}
|
||||
run: |
|
||||
make test_deepspeed
|
||||
|
||||
- name: Run FSDP integration tests
|
||||
if: ${{ !cancelled() && (success() || failure()) }}
|
||||
run: |
|
||||
make test_fsdp
|
||||
|
||||
- name: Run TP integration tests
|
||||
if: ${{ !cancelled() && (success() || failure()) }}
|
||||
run: |
|
||||
make test_tp
|
||||
|
||||
- name: Run Examples tests
|
||||
if: ${{ !cancelled() && (success() || failure()) }}
|
||||
run: |
|
||||
|
||||
17
Makefile
17
Makefile
@ -23,16 +23,23 @@ style:
|
||||
doc-builder style src/accelerate docs/source --max_len 119
|
||||
|
||||
# Run tests for the library
|
||||
test_big_modeling:
|
||||
python -m pytest -s -v ./tests/test_big_modeling.py ./tests/test_modeling_utils.py $(if $(IS_GITHUB_CI),--report-log "$(PYTORCH_VERSION)_big_modeling.log",)
|
||||
|
||||
test_core:
|
||||
python -m pytest -s -v ./tests/ --ignore=./tests/test_examples.py --ignore=./tests/deepspeed --ignore=./tests/test_big_modeling.py \
|
||||
--ignore=./tests/fsdp --ignore=./tests/tp --ignore=./tests/test_cli.py $(if $(IS_GITHUB_CI),--report-log "$(PYTORCH_VERSION)_core.log",)
|
||||
python -m pytest -s -v ./tests/ \
|
||||
--ignore=./tests/test_big_modeling.py \
|
||||
--ignore=./tests/test_modeling_utils.py \
|
||||
--ignore=./tests/test_examples.py \
|
||||
--ignore=./tests/test_cli.py \
|
||||
--ignore=./tests/deepspeed \
|
||||
--ignore=./tests/fsdp \
|
||||
--ignore=./tests/tp \
|
||||
$(if $(IS_GITHUB_CI),--report-log "$(PYTORCH_VERSION)_core.log",)
|
||||
|
||||
test_cli:
|
||||
python -m pytest -s -v ./tests/test_cli.py $(if $(IS_GITHUB_CI),--report-log "$(PYTORCH_VERSION)_cli.log",)
|
||||
|
||||
test_big_modeling:
|
||||
python -m pytest -s -v ./tests/test_big_modeling.py ./tests/test_modeling_utils.py $(if $(IS_GITHUB_CI),--report-log "$(PYTORCH_VERSION)_big_modeling.log",)
|
||||
|
||||
test_deepspeed:
|
||||
python -m pytest -s -v ./tests/deepspeed $(if $(IS_GITHUB_CI),--report-log "$(PYTORCH_VERSION)_deepspeed.log",)
|
||||
|
||||
|
||||
@ -7,7 +7,7 @@ RUN pip install transformers evaluate datasets
|
||||
RUN git clone https://github.com/huggingface/accelerate.git
|
||||
|
||||
RUN cd accelerate && \
|
||||
pip install -e . && \
|
||||
pip install -e .[deepspeed] && \
|
||||
cd benchmarks/fp8
|
||||
|
||||
RUN /bin/bash
|
||||
|
||||
@ -62,8 +62,8 @@
|
||||
title: Amazon SageMaker
|
||||
- local: usage_guides/mps
|
||||
title: Apple M1 GPUs
|
||||
- local: usage_guides/ipex
|
||||
title: IPEX training with CPU
|
||||
- local: usage_guides/intel_cpu
|
||||
title: Intel CPU
|
||||
- local: usage_guides/gaudi
|
||||
title: Intel Gaudi
|
||||
- local: usage_guides/compilation
|
||||
|
||||
@ -139,7 +139,7 @@ values. They can also be passed in manually.
|
||||
* `--cpu` (`bool`) -- Whether or not to force the training on the CPU.
|
||||
* `--multi_gpu` (`bool`) -- Whether or not this should launch a distributed GPU training.
|
||||
* `--tpu` (`bool`) -- Whether or not this should launch a TPU training.
|
||||
* `--ipex` (`bool`) -- Whether or not this should launch an Intel Pytorch Extension (IPEX) training.
|
||||
* `--ipex` (`bool`) -- Whether or not this should launch an Intel Pytorch Extension (IPEX) training. **This argument is deprecated, will be removed in Accelerate v1.10**
|
||||
|
||||
**Resource Selection Arguments**:
|
||||
|
||||
@ -158,7 +158,7 @@ The following arguments are useful for selecting which training paradigm to use.
|
||||
* `--use_deepspeed` (`bool`) -- Whether or not to use DeepSpeed for training.
|
||||
* `--use_fsdp` (`bool`) -- Whether or not to use FullyShardedDataParallel for training.
|
||||
* `--use_megatron_lm` (`bool`) -- Whether or not to use Megatron-LM for training.
|
||||
* `--use_xpu` (`bool`) -- Whether to use IPEX plugin to speed up training on XPU specifically. **This argument is deprecated and ignored, will be removed in Accelerate v1.20**
|
||||
* `--use_xpu` (`bool`) -- Whether to use IPEX plugin to speed up training on XPU specifically. **This argument is deprecated and ignored, will be removed in Accelerate v1.10**
|
||||
|
||||
**Distributed GPU Arguments**:
|
||||
|
||||
|
||||
@ -29,6 +29,11 @@ rendered properly in your Markdown viewer.
|
||||
[[autodoc]] tracking.WandBTracker
|
||||
- __init__
|
||||
|
||||
## Trackio
|
||||
|
||||
[[autodoc]] tracking.TrackioTracker
|
||||
- __init__
|
||||
|
||||
## CometMLTracker
|
||||
|
||||
[[autodoc]] tracking.CometMLTracker
|
||||
@ -48,3 +53,8 @@ rendered properly in your Markdown viewer.
|
||||
|
||||
[[autodoc]] tracking.ClearMLTracker
|
||||
- __init__
|
||||
|
||||
## SwanLabTracker
|
||||
|
||||
[[autodoc]] tracking.SwanLabTracker
|
||||
- __init__
|
||||
|
||||
@ -245,7 +245,7 @@ As was pointed out in this [blog-post](https://huggingface.co/blog/gradient_accu
|
||||
|
||||
> [...] for gradient accumulation across token-level tasks like causal LM training, the correct loss should be computed by the **total loss across all batches in a gradient accumulation step** divided by the **total number of all non padding tokens in those batches**. This is not the same as the average of the per-batch loss values.
|
||||
|
||||
In other words, some adjustements must be made on losses that operate on a token-level basis.
|
||||
In other words, some adjustments must be made on losses that operate on a token-level basis.
|
||||
|
||||
### Skeleton code
|
||||
|
||||
@ -282,7 +282,7 @@ for update_step in range(total_updates):
|
||||
num_items_in_batch = accelerator.gather(num_items_in_batch).sum().item()
|
||||
|
||||
for i, batch in enumerate(batch_samples):
|
||||
# if we perform gradient accumulation in a multi-devices set-up, we want to avoid unecessary communications when accumulating
|
||||
# if we perform gradient accumulation in a multi-devices set-up, we want to avoid unnecessary communications when accumulating
|
||||
# cf: https://muellerzr.github.io/blog/gradient_accumulation.html
|
||||
if (i < len(batch_samples) - 1 and accelerator.num_processes > 1):
|
||||
ctx = model.no_sync
|
||||
@ -294,7 +294,7 @@ for update_step in range(total_updates):
|
||||
with ctx():
|
||||
inputs, targets = batch
|
||||
outputs = model(inputs)
|
||||
loss = loss_function(outputs, targets) # the loss function shoud sum over samples rather than averaging
|
||||
loss = loss_function(outputs, targets) # the loss function should sum over samples rather than averaging
|
||||
|
||||
# We multiply by num_processes because the DDP calculates the average gradient across all devices whereas dividing by num_items_in_batch already takes into account all devices
|
||||
# Same reason for gradient_accumulation_steps, but this times it's Accelerate that calculate the average gradient across the accumulated steps
|
||||
@ -394,7 +394,7 @@ for update_step in range(total_gradient_updates):
|
||||
for i, batch in enumerate(batch_samples):
|
||||
inputs, labels = batch["input_ids"], batch["labels"]
|
||||
total_batched_samples += 1
|
||||
# if we perform gradient accumulation in a multi-devices set-up, we want to avoid unecessary communications when accumulating
|
||||
# if we perform gradient accumulation in a multi-devices set-up, we want to avoid unnecessary communications when accumulating
|
||||
# cf: https://muellerzr.github.io/blog/gradient_accumulation.html
|
||||
if (i < len(batch_samples) - 1 and accelerator.num_processes > 1):
|
||||
ctx = model.no_sync
|
||||
|
||||
@ -13,34 +13,11 @@ specific language governing permissions and limitations under the License.
|
||||
rendered properly in your Markdown viewer.
|
||||
-->
|
||||
|
||||
# Intel® Extension for PyTorch
|
||||
|
||||
[IPEX](https://github.com/intel/intel-extension-for-pytorch) is optimized for CPUs with AVX-512 or above, and functionally works for CPUs with only AVX2. So, it is expected to bring performance benefit for Intel CPU generations with AVX-512 or above while CPUs with only AVX2 (e.g., AMD CPUs or older Intel CPUs) might result in a better performance under IPEX, but not guaranteed. IPEX provides performance optimizations for CPU training with both Float32 and BFloat16. The usage of BFloat16 is the main focus of the following sections.
|
||||
|
||||
Low precision data type BFloat16 has been natively supported on the 3rd Generation Xeon® Scalable Processors (aka Cooper Lake) with AVX512 instruction set and will be supported on the next generation of Intel® Xeon® Scalable Processors with Intel® Advanced Matrix Extensions (Intel® AMX) instruction set with further boosted performance. The Auto Mixed Precision for CPU backend has been enabled since PyTorch-1.10. At the same time, the support of Auto Mixed Precision with BFloat16 for CPU and BFloat16 optimization of operators has been massively enabled in Intel® Extension for PyTorch, and partially upstreamed to PyTorch master branch. Users can get better performance and user experience with IPEX Auto Mixed Precision.
|
||||
|
||||
## IPEX installation:
|
||||
|
||||
IPEX release is following PyTorch, to install via pip:
|
||||
|
||||
| PyTorch Version | IPEX version |
|
||||
| :---------------: | :----------: |
|
||||
| 2.0 | 2.0.0 |
|
||||
| 1.13 | 1.13.0 |
|
||||
| 1.12 | 1.12.300 |
|
||||
| 1.11 | 1.11.200 |
|
||||
| 1.10 | 1.10.100 |
|
||||
|
||||
```
|
||||
pip install intel_extension_for_pytorch==<version_name> -f https://developer.intel.com/ipex-whl-stable-cpu
|
||||
```
|
||||
|
||||
Check more approaches for [IPEX installation](https://intel.github.io/intel-extension-for-pytorch/cpu/latest/tutorials/installation.html).
|
||||
|
||||
# Training on Intel CPU
|
||||
|
||||
## How It Works For Training optimization in CPU
|
||||
|
||||
Accelerate has integrated [IPEX](https://github.com/intel/intel-extension-for-pytorch), all you need to do is enabling it through the config.
|
||||
Accelerate has full support for Intel CPU, all you need to do is enabling it through the config.
|
||||
|
||||
**Scenario 1**: Acceleration of No distributed CPU training
|
||||
|
||||
@ -55,7 +32,6 @@ This machine
|
||||
Which type of machine are you using?
|
||||
No distributed training
|
||||
Do you want to run your training on CPU only (even if a GPU / Apple Silicon device is available)? [yes/NO]:yes
|
||||
Do you want to use Intel PyTorch Extension (IPEX) to speed up training on CPU? [yes/NO]:yes
|
||||
Do you wish to optimize your script with torch dynamo?[yes/NO]:NO
|
||||
Do you want to use DeepSpeed? [yes/NO]: NO
|
||||
-----------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||
@ -69,15 +45,12 @@ default options when doing
|
||||
accelerate launch my_script.py --args_to_my_script
|
||||
```
|
||||
|
||||
For instance, here is how you would run the NLP example `examples/nlp_example.py` (from the root of the repo) with IPEX enabled.
|
||||
default_config.yaml that is generated after `accelerate config`
|
||||
For instance, here is how you would run the NLP example `examples/nlp_example.py` (from the root of the repo) with `default_config.yaml` which is generated by `accelerate config`
|
||||
|
||||
```bash
|
||||
compute_environment: LOCAL_MACHINE
|
||||
distributed_type: 'NO'
|
||||
downcast_bf16: 'no'
|
||||
ipex_config:
|
||||
ipex: true
|
||||
machine_rank: 0
|
||||
main_training_function: main
|
||||
mixed_precision: bf16
|
||||
@ -117,7 +90,6 @@ What is the rank of this machine?
|
||||
What is the IP address of the machine that will host the main process? 36.112.23.24
|
||||
What is the port you will use to communicate with the main process? 29500
|
||||
Are all the machines on the same local network? Answer `no` if nodes are on the cloud and/or on different network hosts [YES/no]: yes
|
||||
Do you want to use Intel PyTorch Extension (IPEX) to speed up training on CPU? [yes/NO]:yes
|
||||
Do you want accelerate to launch mpirun? [yes/NO]: yes
|
||||
Please enter the path to the hostfile to use with mpirun [~/hostfile]: ~/hostfile
|
||||
Enter the number of oneCCL worker threads [1]: 1
|
||||
@ -129,13 +101,11 @@ bf16
|
||||
```
|
||||
For instance, here is how you would run the NLP example `examples/nlp_example.py` (from the root of the repo) with IPEX enabled for distributed CPU training.
|
||||
|
||||
default_config.yaml that is generated after `accelerate config`
|
||||
`default_config.yaml` which is generated by `accelerate config`
|
||||
```bash
|
||||
compute_environment: LOCAL_MACHINE
|
||||
distributed_type: MULTI_CPU
|
||||
downcast_bf16: 'no'
|
||||
ipex_config:
|
||||
ipex: true
|
||||
machine_rank: 0
|
||||
main_process_ip: 36.112.23.24
|
||||
main_process_port: 29500
|
||||
@ -156,8 +126,10 @@ use_cpu: true
|
||||
|
||||
Set following env and using intel MPI to launch the training
|
||||
|
||||
In node0, you need to create a configuration file which contains the IP addresses of each node (for example hostfile) and pass that configuration file path as an argument.
|
||||
If you selected to have Accelerate launch `mpirun`, ensure that the location of your hostfile matches the path in the config.
|
||||
In `node0`, you need to create a configuration file which contains the IP addresses of each node (for example hostfile) and pass that configuration file path as an argument.
|
||||
|
||||
If you selected to let Accelerate launch `mpirun`, ensure that the location of your hostfile matches the path in the config.
|
||||
|
||||
```bash
|
||||
$ cat hostfile
|
||||
xxx.xxx.xxx.xxx #node0 ip
|
||||
@ -165,18 +137,18 @@ xxx.xxx.xxx.xxx #node1 ip
|
||||
xxx.xxx.xxx.xxx #node2 ip
|
||||
xxx.xxx.xxx.xxx #node3 ip
|
||||
```
|
||||
When Accelerate is launching `mpirun`, source the oneCCL bindings setvars.sh to get your Intel MPI environment, and then
|
||||
run your script using `accelerate launch`. Note that the python script and environment needs to exist on all of the
|
||||
machines being used for multi-CPU training.
|
||||
|
||||
Before executing `accelerate launch` command, you need source the oneCCL bindings `setvars.sh` to get your Intel MPI environment properly. Note that both the python script and environment need to be available on all of the machines being used for multi-CPU training.
|
||||
|
||||
```bash
|
||||
oneccl_bindings_for_pytorch_path=$(python -c "from oneccl_bindings_for_pytorch import cwd; print(cwd)")
|
||||
source $oneccl_bindings_for_pytorch_path/env/setvars.sh
|
||||
|
||||
accelerate launch examples/nlp_example.py
|
||||
```
|
||||
Otherwise, if you selected not to have Accelerate launch `mpirun`, run the following command in node0 and **16DDP** will
|
||||
be enabled in node0,node1,node2,node3 with BF16 mixed precision. When using this method, the python script, python
|
||||
environment, and accelerate config file need to be present on all of the machines used for multi-CPU training.
|
||||
|
||||
You can also directly launch distributed training with `mpirun` command, you need to run the following command in node0 and **16DDP** will be enabled in node0,node1,node2,node3 with BF16 mixed precision. When using this method, the python script, python environment, and accelerate config file need to be available on all of the machines used for multi-CPU training.
|
||||
|
||||
```bash
|
||||
oneccl_bindings_for_pytorch_path=$(python -c "from oneccl_bindings_for_pytorch import cwd; print(cwd)")
|
||||
source $oneccl_bindings_for_pytorch_path/env/setvars.sh
|
||||
@ -185,11 +157,3 @@ export MASTER_ADDR=xxx.xxx.xxx.xxx #node0 ip
|
||||
export CCL_ATL_TRANSPORT=ofi
|
||||
mpirun -f hostfile -n 16 -ppn 4 accelerate launch examples/nlp_example.py
|
||||
```
|
||||
|
||||
## Related Resources
|
||||
|
||||
- [Project's github](https://github.com/intel/intel-extension-for-pytorch)
|
||||
- [API docs](https://intel.github.io/intel-extension-for-pytorch/cpu/latest/tutorials/api_doc.html)
|
||||
- [Tuning guide](https://intel.github.io/intel-extension-for-pytorch/cpu/latest/tutorials/performance_tuning/tuning_guide.html)
|
||||
- [Blogs & Publications](https://intel.github.io/intel-extension-for-pytorch/cpu/latest/tutorials/blogs_publications.html)
|
||||
|
||||
@ -20,10 +20,11 @@ Accelerate provides a general tracking API that can be used to log useful items
|
||||
|
||||
## Integrated Trackers
|
||||
|
||||
Currently `Accelerate` supports seven trackers out-of-the-box:
|
||||
Currently `Accelerate` supports eight trackers out-of-the-box:
|
||||
|
||||
- TensorBoard
|
||||
- WandB
|
||||
- WandB
|
||||
- Trackio
|
||||
- CometML
|
||||
- Aim
|
||||
- MLFlow
|
||||
|
||||
@ -218,7 +218,7 @@ def parse_args():
|
||||
default="all",
|
||||
help=(
|
||||
'The integration to report the results and logs to. Supported platforms are `"tensorboard"`,'
|
||||
' `"wandb"`, `"comet_ml"`, and `"dvclive"`. Use `"all"` (default) to report to all integrations.'
|
||||
' `"wandb"`, `"comet_ml"`, `"dvclive"`, and `"swanlab"`. Use `"all"` (default) to report to all integrations.'
|
||||
"Only applicable when `--with_tracking` is passed."
|
||||
),
|
||||
)
|
||||
|
||||
@ -215,7 +215,7 @@ def parse_args():
|
||||
default="all",
|
||||
help=(
|
||||
'The integration to report the results and logs to. Supported platforms are `"tensorboard"`,'
|
||||
' `"wandb"`, `"comet_ml"`, and `"dvclive"`. Use `"all"` (default) to report to all integrations.'
|
||||
' `"wandb"`, `"comet_ml"`, and `"dvclive"`, and `"swanlab"`. Use `"all"` (default) to report to all integrations.'
|
||||
"Only applicable when `--with_tracking` is passed."
|
||||
),
|
||||
)
|
||||
|
||||
@ -31,8 +31,8 @@ from accelerate.utils import ProfileKwargs
|
||||
#
|
||||
# This example trains a Bert base model on GLUE MRPC
|
||||
# in any of the following settings (with the same script):
|
||||
# - single CPU or single GPU
|
||||
# - multi GPUS (using PyTorch distributed mode)
|
||||
# - single CPU or single device (CUDA GPU, Intel XPU etc.)
|
||||
# - multi devices (using PyTorch distributed mode)
|
||||
# - (multi) TPUs
|
||||
# - fp16 (mixed-precision) or fp32 (normal precision)
|
||||
#
|
||||
@ -183,7 +183,8 @@ def training_function(config, args):
|
||||
# New Code #
|
||||
accelerator.print(
|
||||
prof.key_averages().table(
|
||||
sort_by="self_cpu_time_total" if args.cpu else "self_cuda_time_total", row_limit=-1
|
||||
sort_by="self_cpu_time_total" if args.cpu else f"self_{accelerator.device.type}_time_total",
|
||||
row_limit=-1,
|
||||
)
|
||||
)
|
||||
|
||||
@ -215,7 +216,7 @@ def main():
|
||||
choices=["no", "fp16", "bf16", "fp8"],
|
||||
help="Whether to use mixed precision. Choose"
|
||||
"between fp16 and bf16 (bfloat16). Bf16 requires PyTorch >= 1.10."
|
||||
"and an Nvidia Ampere GPU.",
|
||||
"and an Nvidia Ampere GPU or an Intel XPU.",
|
||||
)
|
||||
# New Code #
|
||||
parser.add_argument(
|
||||
|
||||
@ -11,8 +11,8 @@ fp8_config:
|
||||
fp8_format: E4M3
|
||||
interval: 1
|
||||
margin: 0
|
||||
override_linear_precision: (false, false, false)
|
||||
override_linear_precision: [false, false, false]
|
||||
# Generally this should always be set to `false` to have the most realistic fp8 eval performance
|
||||
use_autocast_during_eval: false
|
||||
# If using MS-AMP, we ignore all of the prior and set a opt_level
|
||||
#opt_level: O1
|
||||
#opt_level: O1
|
||||
|
||||
@ -32,5 +32,5 @@ In our example, we use a 8B Llama3.1 model, which has a hidden dimension of 4096
|
||||
The figures above were generated on 8x H100 SXM GPUs, with 8192 sequence length and 1000 steps. To run the example, you can use the following command, where you can specify the precision to train in:
|
||||
|
||||
```bash
|
||||
accelerate launch --fsdp2_fp8.py --sequence_length 8192 --num_steps 1000 --log_with wandb --precision [fp8 | bf16]
|
||||
accelerate launch fsdp2_fp8.py --sequence-length 8192 --num-steps 1000 --log_with wandb --precision [fp8 | bf16]
|
||||
```
|
||||
|
||||
@ -187,7 +187,10 @@ def main():
|
||||
def collate_fn(batch):
|
||||
input_ids = torch.tensor([item["input_ids"] for item in batch], dtype=torch.long)
|
||||
labels = torch.tensor([item["labels"] for item in batch], dtype=torch.long)
|
||||
return {"input_ids": input_ids, "labels": labels}
|
||||
# Transformers expect `labels` to not be shifted, though we already shifted them, so we pass them both
|
||||
# We need to pass both `shift_labels` and `labels` to the model, as the loss is calculated inside `if labels is not None`
|
||||
# `shift_labels` take precedence over `labels` in this case
|
||||
return {"input_ids": input_ids, "labels": labels, "shift_labels": labels}
|
||||
|
||||
# We keep batch size at 1, as it is basically the same as sequence length, which we use instead
|
||||
dataloader = DataLoader(dataset, batch_size=1, collate_fn=collate_fn)
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
accelerate # used to be installed in Amazon SageMaker environment
|
||||
evaluate
|
||||
datasets==2.3.2
|
||||
datasets
|
||||
schedulefree
|
||||
huggingface_hub>=0.20.0
|
||||
|
||||
13
setup.py
13
setup.py
@ -41,7 +41,16 @@ extras["deepspeed"] = ["deepspeed"]
|
||||
extras["rich"] = ["rich"]
|
||||
|
||||
extras["test_fp8"] = ["torchao"] # note: TE for now needs to be done via pulling down the docker image directly
|
||||
extras["test_trackers"] = ["wandb", "comet-ml", "tensorboard", "dvclive", "mlflow", "matplotlib"]
|
||||
extras["test_trackers"] = [
|
||||
"wandb",
|
||||
"comet-ml",
|
||||
"tensorboard",
|
||||
"dvclive",
|
||||
"mlflow",
|
||||
"matplotlib",
|
||||
"swanlab",
|
||||
"trackio",
|
||||
]
|
||||
extras["dev"] = extras["quality"] + extras["testing"] + extras["rich"]
|
||||
|
||||
extras["sagemaker"] = [
|
||||
@ -50,7 +59,7 @@ extras["sagemaker"] = [
|
||||
|
||||
setup(
|
||||
name="accelerate",
|
||||
version="1.8.0.dev0",
|
||||
version="1.9.0",
|
||||
description="Accelerate",
|
||||
long_description=open("README.md", encoding="utf-8").read(),
|
||||
long_description_content_type="text/markdown",
|
||||
|
||||
@ -11,7 +11,7 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
__version__ = "1.8.0.dev0"
|
||||
__version__ = "1.9.0"
|
||||
|
||||
from .accelerator import Accelerator
|
||||
from .big_modeling import (
|
||||
|
||||
@ -33,6 +33,8 @@ import torch
|
||||
import torch.utils.hooks as hooks
|
||||
from huggingface_hub import split_torch_state_dict_into_shards
|
||||
|
||||
from accelerate.utils.dataclasses import FP8BackendType
|
||||
|
||||
from .checkpointing import load_accelerator_state, load_custom_state, save_accelerator_state, save_custom_state
|
||||
from .data_loader import DataLoaderDispatcher, prepare_data_loader, skip_first_batches
|
||||
from .logging import get_logger
|
||||
@ -108,6 +110,7 @@ from .utils import (
|
||||
is_xpu_available,
|
||||
load_fsdp_model,
|
||||
load_fsdp_optimizer,
|
||||
model_has_dtensor,
|
||||
pad_across_processes,
|
||||
parse_choice_from_env,
|
||||
recursively_apply,
|
||||
@ -124,6 +127,7 @@ from .utils.constants import (
|
||||
FSDP2_PYTORCH_VERSION,
|
||||
FSDP_PYTORCH_VERSION,
|
||||
PROFILE_PATTERN_NAME,
|
||||
SCALER_NAME,
|
||||
)
|
||||
from .utils.modeling import get_state_dict_offloaded_model
|
||||
from .utils.other import compile_regions, compile_regions_deepspeed, is_compiled_module
|
||||
@ -229,7 +233,12 @@ class Accelerator:
|
||||
- `"all"`
|
||||
- `"tensorboard"`
|
||||
- `"wandb"`
|
||||
- `"trackio"`
|
||||
- `"aim"`
|
||||
- `"comet_ml"`
|
||||
- `"mlflow"`
|
||||
- `"dvclive"`
|
||||
- `"swanlab"`
|
||||
If `"all"` is selected, will pick up all available trackers in the environment and initialize them. Can
|
||||
also accept implementations of `GeneralTracker` for custom trackers, and can be combined with `"all"`.
|
||||
project_config ([`~utils.ProjectConfiguration`], *optional*):
|
||||
@ -298,6 +307,7 @@ class Accelerator:
|
||||
self.project_configuration = ProjectConfiguration(project_dir=project_dir)
|
||||
if project_dir is not None and self.project_dir is None:
|
||||
self.project_configuration.set_directories(project_dir)
|
||||
|
||||
if mixed_precision is not None:
|
||||
mixed_precision = str(mixed_precision)
|
||||
if mixed_precision not in PrecisionType:
|
||||
@ -319,7 +329,7 @@ class Accelerator:
|
||||
|
||||
if deepspeed_plugins is None:
|
||||
# First check if we're creating another `Accelerator` w/o setting `deepspeed_plugin`
|
||||
if PartialState._shared_state != {} and PartialState().distributed_type == DistributedType.DEEPSPEED:
|
||||
if AcceleratorState._shared_state != {} and AcceleratorState().distributed_type == DistributedType.DEEPSPEED:
|
||||
deepspeed_plugins = AcceleratorState().deepspeed_plugins
|
||||
else:
|
||||
# init from env variables
|
||||
@ -329,8 +339,8 @@ class Accelerator:
|
||||
else:
|
||||
# If we're creating a second `Accelerator`, users shouldn't be passing in a `deepspeed_plugin`
|
||||
if (
|
||||
PartialState().distributed_type == DistributedType.DEEPSPEED
|
||||
and AcceleratorState._shared_state != {}
|
||||
AcceleratorState._shared_state != {}
|
||||
and AcceleratorState().distributed_type == DistributedType.DEEPSPEED
|
||||
and AcceleratorState().deepspeed_plugins is not None
|
||||
):
|
||||
raise NotImplementedError(
|
||||
@ -455,27 +465,34 @@ class Accelerator:
|
||||
|
||||
# Check for automatic FP8 recipe creation
|
||||
if self.fp8_enabled and not self.has_fp8_handler:
|
||||
# Prioritize AO -> TE -> MSAMP
|
||||
if is_torchao_available():
|
||||
logger.info("Found `torchao` installed, using it for FP8 training.")
|
||||
if self.fp8_backend == FP8BackendType.AO:
|
||||
self.ao_recipe_handler = AORecipeKwargs()
|
||||
elif is_transformer_engine_available():
|
||||
logger.info("Found `transformer-engine` installed, using it for FP8 training.")
|
||||
elif self.fp8_backend == FP8BackendType.TE:
|
||||
self.te_recipe_handler = TERecipeKwargs()
|
||||
elif is_msamp_available():
|
||||
logger.info("Found `msamp` installed, using it for FP8 training.")
|
||||
elif self.fp8_backend == FP8BackendType.MSAMP:
|
||||
self.msamp_recipe_handler = MSAMPRecipeKwargs()
|
||||
else:
|
||||
raise ImportError(
|
||||
"Tried to train with `fp8` and auto-detect backend, but no FP8-compatible backend was installed. "
|
||||
"Valid backends are: `torchao`, `transformer-engine`, and `msamp`."
|
||||
)
|
||||
elif self.fp8_backend == FP8BackendType.NO:
|
||||
# Prioritize AO -> TE -> MSAMP
|
||||
if is_torchao_available():
|
||||
logger.info("Found `torchao` installed, using it for FP8 training.")
|
||||
self.ao_recipe_handler = AORecipeKwargs()
|
||||
elif is_transformer_engine_available():
|
||||
logger.info("Found `transformer-engine` installed, using it for FP8 training.")
|
||||
self.te_recipe_handler = TERecipeKwargs()
|
||||
elif is_msamp_available():
|
||||
logger.info("Found `msamp` installed, using it for FP8 training.")
|
||||
self.msamp_recipe_handler = MSAMPRecipeKwargs()
|
||||
else:
|
||||
raise ImportError(
|
||||
"Tried to train with `fp8` and auto-detect backend, but no FP8-compatible backend was installed. "
|
||||
"Valid backends are: `torchao`, `transformer-engine`, and `msamp`."
|
||||
)
|
||||
self.has_fp8_handler = True
|
||||
|
||||
self.delayed_fp8_autocast = False
|
||||
if self.has_fp8_handler:
|
||||
# We already check if FP8 is available during `self.state`
|
||||
if mixed_precision != "fp8" and (
|
||||
if not self.fp8_enabled and (
|
||||
self.distributed_type not in (DistributedType.FSDP, DistributedType.DEEPSPEED)
|
||||
):
|
||||
raise ValueError("Passing in an FP8 configuration requires setting `mixed_precision='fp8'`.")
|
||||
@ -485,7 +502,11 @@ class Accelerator:
|
||||
)
|
||||
|
||||
# TODO: S1ro - this is probably gonna be a problem with other fp8 backends too
|
||||
if self.fp8_backend == "AO" and self.state.fsdp_plugin.cpu_ram_efficient_loading:
|
||||
if (
|
||||
self.fp8_backend == FP8BackendType.AO
|
||||
and self.state.distributed_type == DistributedType.FSDP
|
||||
and self.state.fsdp_plugin.cpu_ram_efficient_loading
|
||||
):
|
||||
raise ValueError(
|
||||
"torchao with FSDP2 and cpu_ram_efficient_loading is not supported, setting `cpu_ram_efficient_loading` to False will fix the issue and work as intended."
|
||||
)
|
||||
@ -512,6 +533,8 @@ class Accelerator:
|
||||
parse_choice_from_env("ACCELERATE_GRADIENT_ACCUMULATION_STEPS", gradient_accumulation_steps)
|
||||
)
|
||||
gradient_accumulation_plugin = GradientAccumulationPlugin(num_steps=gradient_accumulation_steps)
|
||||
|
||||
# If using DeepSpeed, update gradient accumulation steps from the DeepSpeed plugin
|
||||
self.gradient_state = GradientState(
|
||||
gradient_accumulation_plugin=gradient_accumulation_plugin,
|
||||
)
|
||||
@ -567,7 +590,7 @@ class Accelerator:
|
||||
elif self.fp8_enabled:
|
||||
# We always enable `native_amp` for FP8
|
||||
self.native_amp = True
|
||||
if self.fp8_backend == "MSAMP":
|
||||
if self.fp8_backend == FP8BackendType.MSAMP:
|
||||
if self.distributed_type == DistributedType.FSDP:
|
||||
raise NotImplementedError(
|
||||
"`accelerate` + `MS-AMP` + `FSDP` is not supported at this time. "
|
||||
@ -1039,7 +1062,8 @@ class Accelerator:
|
||||
"""
|
||||
context = contextlib.nullcontext
|
||||
if self.use_distributed:
|
||||
context = getattr(model, "no_sync", context)
|
||||
if self.distributed_type != DistributedType.DEEPSPEED or self.state.deepspeed_plugin.zero_stage < 2:
|
||||
context = getattr(model, "no_sync", context)
|
||||
|
||||
with context():
|
||||
yield
|
||||
@ -1404,11 +1428,18 @@ class Accelerator:
|
||||
old_named_params = self._get_named_parameters(*args, drop_refs=False)
|
||||
|
||||
if self.distributed_type in [DistributedType.MULTI_CPU, DistributedType.MULTI_XPU, DistributedType.NO]:
|
||||
if (self.device.type == "cpu" or self.device.type == "xpu") and self.state.use_ipex:
|
||||
if (
|
||||
is_torch_version("<", "2.7.0")
|
||||
and (self.device.type == "cpu" or self.device.type == "xpu")
|
||||
and self.state.use_ipex
|
||||
):
|
||||
logger.warning(
|
||||
"You are using lower version of PyTorch(< 2.7.0) with ipex acceleration on Intel CPU or XPU, Intel has upstreamed most of the optimizations into stock PyTorch from 2.7.0, we enourage you to install the latest stock PyTorch and enjoy the out-of-experience on Intel CPU/XPU."
|
||||
)
|
||||
args = self._prepare_ipex(*args)
|
||||
if self.fp8_backend == "TE":
|
||||
if self.fp8_backend == FP8BackendType.TE:
|
||||
args = self._prepare_te(*args)
|
||||
elif self.fp8_backend == "AO":
|
||||
elif self.fp8_backend == FP8BackendType.AO:
|
||||
args = self._prepare_ao(*args)
|
||||
if self.distributed_type == DistributedType.DEEPSPEED:
|
||||
result = self._prepare_deepspeed(*args)
|
||||
@ -1417,7 +1448,7 @@ class Accelerator:
|
||||
elif self.is_fsdp2:
|
||||
result = self._prepare_fsdp2(*args)
|
||||
else:
|
||||
if self.fp8_backend == "MSAMP":
|
||||
if self.fp8_backend == FP8BackendType.MSAMP:
|
||||
args, device_placement = self._prepare_msamp(*args, device_placement=device_placement)
|
||||
result = tuple(
|
||||
self._prepare_one(obj, first_pass=True, device_placement=d) for obj, d in zip(args, device_placement)
|
||||
@ -1557,7 +1588,7 @@ class Accelerator:
|
||||
model._original_forward = model.forward
|
||||
autocast_context = get_mixed_precision_context_manager(self.native_amp, self.autocast_handler)
|
||||
# NOTE: MS-AMP adds `__func__` already to `model.forward`, so we should always use `model.forward`
|
||||
if self.fp8_backend == "MSAMP" or not hasattr(model.forward, "__func__"):
|
||||
if self.fp8_backend == FP8BackendType.MSAMP or not hasattr(model.forward, "__func__"):
|
||||
model_forward_func = model.forward
|
||||
model.forward = convert_outputs_to_fp32(autocast_context(model_forward_func))
|
||||
else:
|
||||
@ -1567,7 +1598,7 @@ class Accelerator:
|
||||
model.forward = MethodType(convert_outputs_to_fp32(model.forward.__func__), model)
|
||||
|
||||
# We prepare TE after, allowing for bf16 autocast to happen first
|
||||
if self.fp8_backend == "TE" and not self.delayed_fp8_autocast:
|
||||
if self.fp8_backend == FP8BackendType.TE and not self.delayed_fp8_autocast:
|
||||
model = apply_fp8_autowrap(model, self.te_recipe_handler or self.fp8_recipe_handler)
|
||||
|
||||
if (getattr(model, "is_loaded_in_8bit", False) or getattr(model, "is_loaded_in_4bit", False)) and getattr(
|
||||
@ -1620,6 +1651,10 @@ class Accelerator:
|
||||
DistributedType.MULTI_XPU,
|
||||
DistributedType.MULTI_HPU,
|
||||
):
|
||||
if model_has_dtensor(model):
|
||||
raise ValueError(
|
||||
"Your model contains `DTensor` parameters, which is incompatible with DDP. Maybe you loaded your model with `device_map='auto'`? Specify `device_map='cuda'` or 'cpu' instead."
|
||||
)
|
||||
if any(p.requires_grad for p in model.parameters()):
|
||||
kwargs = self.ddp_handler.to_kwargs() if self.ddp_handler is not None else {}
|
||||
# TODO: Look at enabling native TP training directly with a proper config
|
||||
@ -1789,7 +1824,7 @@ class Accelerator:
|
||||
elif self.distributed_type == DistributedType.XLA and self.state.fork_launched:
|
||||
model = xmp.MpModelWrapper(model).to(self.device)
|
||||
# Now we can apply the FP8 autocast
|
||||
if self.fp8_backend == "TE" and self.delayed_fp8_autocast:
|
||||
if self.fp8_backend == FP8BackendType.TE and self.delayed_fp8_autocast:
|
||||
model = apply_fp8_autowrap(model, self.te_recipe_handler or self.fp8_recipe_handler)
|
||||
# torch.compile should be called last and only if the model isn't already compiled
|
||||
if self.state.dynamo_plugin.backend != DynamoBackend.NO and not is_compiled_module(model):
|
||||
@ -1867,7 +1902,7 @@ class Accelerator:
|
||||
import deepspeed
|
||||
|
||||
ds_initialize = deepspeed.initialize
|
||||
if self.fp8_backend == "MSAMP":
|
||||
if self.fp8_backend == FP8BackendType.MSAMP:
|
||||
# MS-AMP requires DeepSpeed patches
|
||||
from msamp import deepspeed as msamp_deepspeed
|
||||
|
||||
@ -1931,6 +1966,15 @@ class Accelerator:
|
||||
gradient_accumulation_steps=self.gradient_accumulation_steps,
|
||||
)
|
||||
|
||||
deepspeed_gradient_accumulation_steps = deepspeed_plugin.get_value("gradient_accumulation_steps")
|
||||
# update gradient_accumulation_steps if there is a mismatch
|
||||
if deepspeed_gradient_accumulation_steps != self.gradient_accumulation_steps:
|
||||
logger.warning(
|
||||
f"Gradient accumulation steps mismatch: GradientAccumulationPlugin has {self.gradient_accumulation_steps}, "
|
||||
f"DeepSpeed config has {deepspeed_gradient_accumulation_steps}. Using DeepSpeed's value."
|
||||
)
|
||||
self.gradient_accumulation_steps = deepspeed_gradient_accumulation_steps
|
||||
|
||||
config_kwargs = {
|
||||
"gradient_clipping": 1.0,
|
||||
"zero_optimization.stage3_gather_16bit_weights_on_model_save": False,
|
||||
@ -1996,7 +2040,7 @@ class Accelerator:
|
||||
|
||||
if model is not None:
|
||||
# If we are using FP8, we need to apply the autowrap now
|
||||
if self.fp8_backend == "TE":
|
||||
if self.fp8_backend == FP8BackendType.TE:
|
||||
model = apply_fp8_autowrap(model, self.fp8_recipe_handler)
|
||||
# if the model is an MOE, set the appropriate MOE layers as leaf Z3 modules
|
||||
deepspeed_plugin.set_moe_leaf_modules(model)
|
||||
@ -2453,7 +2497,7 @@ class Accelerator:
|
||||
device_placement = self.device_placement
|
||||
# NOTE: Special case with MS-AMP we do *not* pass in the scaler explicitly to the `AcceleratedOptimizer`,
|
||||
# Their optimizer handles it for us.
|
||||
scaler = None if self.fp8_backend == "MSAMP" else self.scaler
|
||||
scaler = None if self.fp8_backend == FP8BackendType.MSAMP else self.scaler
|
||||
optimizer = AcceleratedOptimizer(optimizer, device_placement=device_placement, scaler=scaler)
|
||||
self._optimizers.append(optimizer)
|
||||
return optimizer
|
||||
@ -2523,7 +2567,7 @@ class Accelerator:
|
||||
# deepspeed handles loss scaling by gradient_accumulation_steps in its `backward`
|
||||
loss = loss / self.gradient_accumulation_steps
|
||||
if self.distributed_type == DistributedType.DEEPSPEED:
|
||||
self.deepspeed_engine_wrapped.backward(loss, **kwargs)
|
||||
self.deepspeed_engine_wrapped.backward(loss, sync_gradients=self.sync_gradients, **kwargs)
|
||||
elif self.distributed_type == DistributedType.MEGATRON_LM:
|
||||
return
|
||||
elif self.scaler is not None:
|
||||
@ -2664,8 +2708,9 @@ class Accelerator:
|
||||
parameters, max_norm, norm_type=norm_type
|
||||
) # viz: https://github.com/pytorch/torchtitan/blob/main/docs/fsdp.md
|
||||
elif self.distributed_type == DistributedType.DEEPSPEED:
|
||||
# `accelerator.backward(loss)` is doing that automatically. Therefore, its implementation is not needed
|
||||
# We cannot return the gradient norm because DeepSpeed does it.
|
||||
# DeepSpeed handles gradient clipping internally, but we can retrieve the gradient norm
|
||||
if self.deepspeed_engine_wrapped is not None:
|
||||
return self.deepspeed_engine_wrapped.get_global_grad_norm()
|
||||
return None
|
||||
elif self.distributed_type == DistributedType.XLA:
|
||||
# Reduce gradients first for XLA
|
||||
@ -3495,6 +3540,21 @@ class Accelerator:
|
||||
else:
|
||||
models.append(model)
|
||||
|
||||
# We need to load the scaler state before the optimizer for FSDP2
|
||||
# (`torch.distributed.checkpoint.set_optimizer_state_dict`) which we use to set the state of the optimizer calls `optimizer.step` on
|
||||
# a dummy tensor, but since the scaler is not initialized, it will raise an error (the scaler exists but its `_scale` is None)
|
||||
scaler = None
|
||||
if self.scaler is not None and self.is_fsdp2:
|
||||
input_scaler_file = os.path.join(input_dir, SCALER_NAME)
|
||||
scaler_state = torch.load(input_scaler_file)
|
||||
self.scaler.load_state_dict(scaler_state)
|
||||
# We also need to call the `_lazy_init_scale_growth_tracker` to initialize the scaler, as it would else be called
|
||||
# on the first call to scale
|
||||
self.scaler._lazy_init_scale_growth_tracker(self.scaler._device)
|
||||
logger.info("GradScaler state loaded successfully")
|
||||
else:
|
||||
scaler = self.scaler
|
||||
|
||||
# Load the optimizers taking care of FSDP and DeepSpeed nuances
|
||||
optimizers = []
|
||||
if self.distributed_type == DistributedType.FSDP:
|
||||
@ -3543,7 +3603,7 @@ class Accelerator:
|
||||
schedulers,
|
||||
dataloaders,
|
||||
self.state.process_index,
|
||||
self.scaler,
|
||||
scaler,
|
||||
map_location,
|
||||
load_kwargs,
|
||||
**load_model_func_kwargs,
|
||||
@ -3626,7 +3686,7 @@ class Accelerator:
|
||||
|
||||
# we need this bit as `WeightWithDynamic...` returns 0 when `data_ptr()` is called,
|
||||
# the underlying pointer is actually hidden in `_tensor` attribute
|
||||
if self.fp8_backend == "AO":
|
||||
if self.fp8_backend == FP8BackendType.AO:
|
||||
from torchao.float8.fsdp_utils import WeightWithDynamicFloat8CastTensor
|
||||
|
||||
accessor_mapping[WeightWithDynamicFloat8CastTensor] = "_tensor"
|
||||
@ -3713,8 +3773,7 @@ class Accelerator:
|
||||
elif self.is_fsdp2:
|
||||
from torch.distributed.checkpoint.state_dict import StateDictOptions, get_model_state_dict
|
||||
|
||||
# This hangs if `cpu_offload` is also True
|
||||
options = StateDictOptions(full_state_dict=True, broadcast_from_rank0=True)
|
||||
options = StateDictOptions(full_state_dict=True, broadcast_from_rank0=True, cpu_offload=True)
|
||||
state_dict = get_model_state_dict(model, options=options)
|
||||
elif self.distributed_type == DistributedType.FSDP:
|
||||
from torch.distributed.fsdp import FullStateDictConfig, StateDictType
|
||||
@ -3936,17 +3995,18 @@ class Accelerator:
|
||||
)
|
||||
|
||||
@property
|
||||
def fp8_backend(self):
|
||||
def fp8_backend(self) -> FP8BackendType:
|
||||
"Returns the configured backend for training in FP8"
|
||||
if self.has_fp8_handler:
|
||||
if self.fp8_recipe_handler is not None:
|
||||
return self.fp8_recipe_handler.backend
|
||||
return FP8BackendType(self.fp8_recipe_handler.backend)
|
||||
elif self.ao_recipe_handler is not None:
|
||||
return "AO"
|
||||
return FP8BackendType.AO
|
||||
elif self.te_recipe_handler is not None:
|
||||
return "TE"
|
||||
return FP8BackendType.TE
|
||||
elif self.msamp_recipe_handler is not None:
|
||||
return "MSAMP"
|
||||
return FP8BackendType.MSAMP
|
||||
elif self.state.deepspeed_plugin is not None and self.state.deepspeed_plugin.enable_msamp:
|
||||
return "MSAMP"
|
||||
return None
|
||||
return FP8BackendType.MSAMP
|
||||
|
||||
return FP8BackendType(parse_choice_from_env("ACCELERATE_FP8_BACKEND", "NO"))
|
||||
|
||||
@ -774,8 +774,8 @@ def get_cluster_input():
|
||||
)
|
||||
fp8_config["fp8_format"] = _ask_options(
|
||||
"Which weight format should be used?",
|
||||
["HYBRID", "E4M3"],
|
||||
lambda x: "HYBRID" if x == 0 else "E4M3",
|
||||
["HYBRID", "E4M3", "E5M2"],
|
||||
lambda i: ["HYBRID", "E4M3", "E5M2"][i],
|
||||
default=0,
|
||||
)
|
||||
fp8_config["amax_history_length"] = _ask_field(
|
||||
|
||||
@ -692,8 +692,8 @@ def launch_command_parser(subparsers=None):
|
||||
fp8_args.add_argument(
|
||||
"--fp8_format",
|
||||
type=str,
|
||||
default="E4M3",
|
||||
choices=["E4M3", "HYBRID"],
|
||||
default="HYBRID",
|
||||
choices=["HYBRID", "E4M3", "E5M2"],
|
||||
help="The format to use for the FP8 recipe (useful only when `--fp8_backend=te` is passed).",
|
||||
)
|
||||
fp8_args.add_argument(
|
||||
@ -1151,6 +1151,12 @@ def _validate_launch_command(args):
|
||||
f"\t`--num_cpu_threads_per_process` was set to `{args.num_cpu_threads_per_process}` to improve out-of-box performance when training on CPUs"
|
||||
)
|
||||
|
||||
if args.ipex is not None:
|
||||
logger.warning(
|
||||
"ipex flag is deprecated, will be removed in Accelerate v1.10. "
|
||||
"From 2.7.0, PyTorch has all needed optimizations for Intel CPU and XPU."
|
||||
)
|
||||
|
||||
if args.use_xpu is not None:
|
||||
logger.warning(
|
||||
"use_xpu is deprecated and ignored, will be removed in Accelerate v1.20. "
|
||||
|
||||
@ -32,6 +32,7 @@ from .utils import (
|
||||
find_batch_size,
|
||||
get_data_structure,
|
||||
initialize_tensors,
|
||||
is_datasets_available,
|
||||
is_torch_version,
|
||||
is_torchdata_stateful_dataloader_available,
|
||||
send_to_device,
|
||||
@ -565,7 +566,8 @@ class DataLoaderShard(DataLoaderAdapter, DataLoaderStateMixin):
|
||||
try:
|
||||
current_batch = next(dataloader_iter)
|
||||
except StopIteration:
|
||||
yield
|
||||
self.end()
|
||||
return
|
||||
|
||||
batch_index = 0
|
||||
while True:
|
||||
@ -1194,7 +1196,16 @@ def prepare_data_loader(
|
||||
dataloader.sampler.generator = generator
|
||||
# No change if no multiprocess
|
||||
if (num_processes != 1 or state.distributed_type == DistributedType.MEGATRON_LM) and not dispatch_batches:
|
||||
if isinstance(new_dataset, IterableDataset):
|
||||
if is_datasets_available():
|
||||
from datasets import IterableDataset as DatasetsIterableDataset
|
||||
if (
|
||||
is_datasets_available()
|
||||
and isinstance(new_dataset, DatasetsIterableDataset)
|
||||
and not split_batches
|
||||
and new_dataset.n_shards > num_processes
|
||||
):
|
||||
new_dataset = new_dataset.shard(num_shards=num_processes, index=process_index)
|
||||
elif isinstance(new_dataset, IterableDataset):
|
||||
if getattr(dataloader.dataset, "generator", None) is not None:
|
||||
synchronized_generator = dataloader.dataset.generator
|
||||
new_dataset = IterableDatasetShard(
|
||||
|
||||
@ -60,8 +60,8 @@ def notebook_launcher(
|
||||
|
||||
<Tip warning={true}>
|
||||
|
||||
To use this function absolutely zero calls to a CUDA device must be made in the notebook session before calling. If
|
||||
any have been made, you will need to restart the notebook and make sure no cells use any CUDA capability.
|
||||
To use this function absolutely zero calls to a device must be made in the notebook session before calling. If any
|
||||
have been made, you will need to restart the notebook and make sure no cells use any device capability.
|
||||
|
||||
Setting `ACCELERATE_DEBUG_MODE="1"` in your environment will run a test before truly launching to ensure that none
|
||||
of those calls have been made.
|
||||
@ -76,11 +76,11 @@ def notebook_launcher(
|
||||
Tuple of arguments to pass to the function (it will receive `*args`).
|
||||
num_processes (`int`, *optional*):
|
||||
The number of processes to use for training. Will default to 8 in Colab/Kaggle if a TPU is available, to
|
||||
the number of GPUs available otherwise.
|
||||
the number of devices available otherwise.
|
||||
mixed_precision (`str`, *optional*, defaults to `"no"`):
|
||||
If `fp16` or `bf16`, will use mixed precision training on multi-GPU.
|
||||
If `fp16` or `bf16`, will use mixed precision training on multi-device.
|
||||
use_port (`str`, *optional*, defaults to `"29500"`):
|
||||
The port to use to communicate between processes when launching a multi-GPU training.
|
||||
The port to use to communicate between processes when launching a multi-device training.
|
||||
master_addr (`str`, *optional*, defaults to `"127.0.0.1"`):
|
||||
The address to use for communication between processes.
|
||||
node_rank (`int`, *optional*, defaults to 0):
|
||||
@ -105,7 +105,7 @@ def notebook_launcher(
|
||||
Example:
|
||||
|
||||
```python
|
||||
# Assume this is defined in a Jupyter Notebook on an instance with two GPUs
|
||||
# Assume this is defined in a Jupyter Notebook on an instance with two devices
|
||||
from accelerate import notebook_launcher
|
||||
|
||||
|
||||
@ -158,27 +158,27 @@ def notebook_launcher(
|
||||
else:
|
||||
if num_processes is None:
|
||||
raise ValueError(
|
||||
"You have to specify the number of GPUs you would like to use, add `num_processes=...` to your call."
|
||||
"You have to specify the number of devices you would like to use, add `num_processes=...` to your call."
|
||||
)
|
||||
if node_rank >= num_nodes:
|
||||
raise ValueError("The node_rank must be less than the number of nodes.")
|
||||
if num_processes > 1:
|
||||
# Multi-GPU launch
|
||||
# Multi-device launch
|
||||
from torch.distributed.launcher.api import LaunchConfig, elastic_launch
|
||||
from torch.multiprocessing import start_processes
|
||||
from torch.multiprocessing.spawn import ProcessRaisedException
|
||||
|
||||
if len(AcceleratorState._shared_state) > 0:
|
||||
raise ValueError(
|
||||
"To launch a multi-GPU training from your notebook, the `Accelerator` should only be initialized "
|
||||
"To launch a multi-device training from your notebook, the `Accelerator` should only be initialized "
|
||||
"inside your training function. Restart your notebook and make sure no cells initializes an "
|
||||
"`Accelerator`."
|
||||
)
|
||||
# Check for specific libraries known to initialize CUDA that users constantly use
|
||||
# Check for specific libraries known to initialize device that users constantly use
|
||||
problematic_imports = are_libraries_initialized("bitsandbytes")
|
||||
if len(problematic_imports) > 0:
|
||||
err = (
|
||||
"Could not start distributed process. Libraries known to initialize CUDA upon import have been "
|
||||
"Could not start distributed process. Libraries known to initialize device upon import have been "
|
||||
"imported already. Please keep these imports inside your training function to try and help with this:"
|
||||
)
|
||||
for lib_name in problematic_imports:
|
||||
@ -203,24 +203,26 @@ def notebook_launcher(
|
||||
# process here (the other ones will be set be the launcher).
|
||||
with patch_environment(**patched_env):
|
||||
# First dummy launch
|
||||
device_type = torch.accelerator.current_accelerator().type if hasattr(torch, "accelerator") else "cuda"
|
||||
distributed_type = "MULTI_XPU" if device_type == "xpu" else "MULTI_GPU"
|
||||
if os.environ.get("ACCELERATE_DEBUG_MODE", "false").lower() == "true":
|
||||
launcher = PrepareForLaunch(test_launch, distributed_type="MULTI_GPU")
|
||||
launcher = PrepareForLaunch(test_launch, distributed_type=distributed_type)
|
||||
try:
|
||||
start_processes(launcher, args=(), nprocs=num_processes, start_method="fork")
|
||||
except ProcessRaisedException as e:
|
||||
err = "An issue was found when verifying a stable environment for the notebook launcher."
|
||||
if "Cannot re-initialize CUDA in forked subprocess" in e.args[0]:
|
||||
if f"Cannot re-initialize {device_type.upper()} in forked subprocess" in e.args[0]:
|
||||
raise RuntimeError(
|
||||
f"{err}"
|
||||
"This likely stems from an outside import causing issues once the `notebook_launcher()` is called. "
|
||||
"Please review your imports and test them when running the `notebook_launcher()` to identify "
|
||||
"which one is problematic and causing CUDA to be initialized."
|
||||
f"which one is problematic and causing {device_type.upper()} to be initialized."
|
||||
) from e
|
||||
else:
|
||||
raise RuntimeError(f"{err} The following error was raised: {e}") from e
|
||||
# Now the actual launch
|
||||
launcher = PrepareForLaunch(function, distributed_type="MULTI_GPU")
|
||||
print(f"Launching training on {num_processes} GPUs.")
|
||||
launcher = PrepareForLaunch(function, distributed_type=distributed_type)
|
||||
print(f"Launching training on {num_processes} {device_type.upper()}s.")
|
||||
try:
|
||||
if rdzv_conf is None:
|
||||
rdzv_conf = {}
|
||||
@ -244,23 +246,25 @@ def notebook_launcher(
|
||||
launch_config_kwargs["log_line_prefix_template"] = log_line_prefix_template
|
||||
elastic_launch(config=LaunchConfig(**launch_config_kwargs), entrypoint=function)(*args)
|
||||
except ProcessRaisedException as e:
|
||||
if "Cannot re-initialize CUDA in forked subprocess" in e.args[0]:
|
||||
if f"Cannot re-initialize {device_type.upper()} in forked subprocess" in e.args[0]:
|
||||
raise RuntimeError(
|
||||
"CUDA has been initialized before the `notebook_launcher` could create a forked subprocess. "
|
||||
f"{device_type.upper()} has been initialized before the `notebook_launcher` could create a forked subprocess. "
|
||||
"This likely stems from an outside import causing issues once the `notebook_launcher()` is called. "
|
||||
"Please review your imports and test them when running the `notebook_launcher()` to identify "
|
||||
"which one is problematic and causing CUDA to be initialized."
|
||||
f"which one is problematic and causing {device_type.upper()} to be initialized."
|
||||
) from e
|
||||
else:
|
||||
raise RuntimeError(f"An issue was found when launching the training: {e}") from e
|
||||
|
||||
else:
|
||||
# No need for a distributed launch otherwise as it's either CPU, GPU or MPS.
|
||||
# No need for a distributed launch otherwise as it's either CPU, GPU, XPU or MPS.
|
||||
if is_mps_available():
|
||||
os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1"
|
||||
print("Launching training on MPS.")
|
||||
elif torch.cuda.is_available():
|
||||
print("Launching training on one GPU.")
|
||||
elif torch.xpu.is_available():
|
||||
print("Launching training on one XPU.")
|
||||
else:
|
||||
print("Launching training on CPU.")
|
||||
function(*args)
|
||||
|
||||
@ -132,7 +132,7 @@ class PartialState:
|
||||
Whether or not to force the script to execute on CPU. Will ignore any accelerators available if set to
|
||||
`True` and force the execution on the CPU.
|
||||
kwargs (additional keyword arguments, *optional*):
|
||||
Additional keyword arguments to pass to the relevent `init_process_group` function. Valid `kwargs` can be
|
||||
Additional keyword arguments to pass to the relevant `init_process_group` function. Valid `kwargs` can be
|
||||
found in [`utils.InitProcessGroupKwargs`]. See the example section for detailed usage.
|
||||
|
||||
**Available attributes:**
|
||||
@ -213,12 +213,6 @@ class PartialState:
|
||||
if self.backend == "tccl":
|
||||
local_rank = os.environ.get("LOCAL_RANK", -1)
|
||||
torch.sdaa.set_device(f"sdaa:{local_rank}")
|
||||
if (
|
||||
self.backend == "nccl"
|
||||
and os.environ.get("ACCELERATE_USE_FSDP", "false") == "true"
|
||||
and os.environ.get("FSDP_OFFLOAD_PARAMS", "false") == "true"
|
||||
):
|
||||
self.backend = "cuda:nccl,cpu:gloo"
|
||||
dist.init_distributed(dist_backend=self.backend, auto_mpi_discovery=False, **kwargs)
|
||||
# We need to flag to `use_deepspeed` to be True to override `distributed_type` later
|
||||
use_deepspeed = True
|
||||
@ -230,6 +224,15 @@ class PartialState:
|
||||
if self.backend == "tccl":
|
||||
local_rank = os.environ.get("LOCAL_RANK", -1)
|
||||
torch.sdaa.set_device(f"sdaa:{local_rank}")
|
||||
if (
|
||||
self.backend == "nccl"
|
||||
and os.environ.get("ACCELERATE_USE_FSDP", "false") == "true"
|
||||
and (
|
||||
os.environ.get("FSDP_OFFLOAD_PARAMS", "false") == "true"
|
||||
or os.environ.get("FSDP_STATE_DICT_TYPE", "SHARDED_STATE_DICT") == "FULL_STATE_DICT"
|
||||
)
|
||||
):
|
||||
self.backend = "cuda:nccl,cpu:gloo"
|
||||
torch.distributed.init_process_group(backend=self.backend, **kwargs)
|
||||
|
||||
# XPU and CPU require special env configs to be set
|
||||
|
||||
@ -79,10 +79,6 @@ def mock_training(accelerator, model):
|
||||
|
||||
def check_weights(operation, state_1, state_2):
|
||||
for weight_1, weight_2 in zip(state_1.values(), state_2.values()):
|
||||
if str(weight_1.device) != torch_device:
|
||||
weight_1 = weight_1.to(torch_device)
|
||||
if str(weight_2.device) != torch_device:
|
||||
weight_2 = weight_2.to(torch_device)
|
||||
if operation == "same":
|
||||
assert torch.allclose(weight_1, weight_2)
|
||||
else:
|
||||
@ -91,7 +87,7 @@ def check_weights(operation, state_1, state_2):
|
||||
|
||||
def check_safetensors_weights(path, model):
|
||||
safe_state_dict = load_file(path / "model.safetensors")
|
||||
safe_loaded_model = TinyModel()
|
||||
safe_loaded_model = TinyModel().to(torch_device)
|
||||
check_weights("diff", model.state_dict(), safe_loaded_model.state_dict())
|
||||
safe_loaded_model.load_state_dict(safe_state_dict)
|
||||
check_weights("same", model.state_dict(), safe_loaded_model.state_dict())
|
||||
@ -99,7 +95,7 @@ def check_safetensors_weights(path, model):
|
||||
|
||||
def check_pytorch_weights(path, model):
|
||||
nonsafe_state_dict = torch.load(path / "pytorch_model.bin", weights_only=True)
|
||||
nonsafe_loaded_model = TinyModel()
|
||||
nonsafe_loaded_model = TinyModel().to(torch_device)
|
||||
check_weights("diff", model.state_dict(), nonsafe_loaded_model.state_dict())
|
||||
nonsafe_loaded_model.load_state_dict(nonsafe_state_dict)
|
||||
check_weights("same", model.state_dict(), nonsafe_loaded_model.state_dict())
|
||||
|
||||
@ -625,7 +625,7 @@ def training_check(use_seedable_sampler=False):
|
||||
msg=lambda msg: f"Did not obtain the same model on CPU or distributed training.\n{msg}",
|
||||
)
|
||||
|
||||
# IPEX support is only for CPU
|
||||
# IPEX CPU tests
|
||||
if is_ipex_available():
|
||||
print("ipex BF16 training check.")
|
||||
AcceleratorState._reset_state()
|
||||
|
||||
@ -61,6 +61,7 @@ from ..utils import (
|
||||
is_pytest_available,
|
||||
is_schedulefree_available,
|
||||
is_sdaa_available,
|
||||
is_swanlab_available,
|
||||
is_tensorboard_available,
|
||||
is_timm_available,
|
||||
is_torch_version,
|
||||
@ -68,6 +69,7 @@ from ..utils import (
|
||||
is_torchao_available,
|
||||
is_torchdata_stateful_dataloader_available,
|
||||
is_torchvision_available,
|
||||
is_trackio_available,
|
||||
is_transformer_engine_available,
|
||||
is_transformers_available,
|
||||
is_triton_available,
|
||||
@ -249,6 +251,10 @@ def require_fp8(test_case):
|
||||
return unittest.skipUnless(fp8_is_available, "test requires FP8 support")(test_case)
|
||||
|
||||
|
||||
def require_fsdp2(test_case):
|
||||
return unittest.skipUnless(is_torch_version(">=", "2.5.0"), "test requires FSDP2 (torch >= 2.5.0)")(test_case)
|
||||
|
||||
|
||||
def require_mlu(test_case):
|
||||
"""
|
||||
Decorator marking a test that requires MLU. These tests are skipped when there are no MLU available.
|
||||
@ -454,6 +460,13 @@ def require_wandb(test_case):
|
||||
return unittest.skipUnless(is_wandb_available(), "test requires wandb")(test_case)
|
||||
|
||||
|
||||
def require_trackio(test_case):
|
||||
"""
|
||||
Decorator marking a test that requires trackio installed. These tests are skipped when trackio isn't installed
|
||||
"""
|
||||
return unittest.skipUnless(is_trackio_available(), "test requires trackio")(test_case)
|
||||
|
||||
|
||||
def require_comet_ml(test_case):
|
||||
"""
|
||||
Decorator marking a test that requires comet_ml installed. These tests are skipped when comet_ml isn't installed
|
||||
@ -482,6 +495,13 @@ def require_dvclive(test_case):
|
||||
return unittest.skipUnless(is_dvclive_available(), "test requires dvclive")(test_case)
|
||||
|
||||
|
||||
def require_swanlab(test_case):
|
||||
"""
|
||||
Decorator marking a test that requires swanlab installed. These tests are skipped when swanlab isn't installed
|
||||
"""
|
||||
return unittest.skipUnless(is_swanlab_available(), "test requires swanlab")(test_case)
|
||||
|
||||
|
||||
def require_pandas(test_case):
|
||||
"""
|
||||
Decorator marking a test that requires pandas installed. These tests are skipped when pandas isn't installed
|
||||
@ -536,7 +556,8 @@ def require_matplotlib(test_case):
|
||||
|
||||
|
||||
_atleast_one_tracker_available = (
|
||||
any([is_wandb_available(), is_tensorboard_available()]) and not is_comet_ml_available()
|
||||
any([is_wandb_available(), is_tensorboard_available(), is_trackio_available(), is_swanlab_available()])
|
||||
and not is_comet_ml_available()
|
||||
)
|
||||
|
||||
|
||||
|
||||
@ -34,7 +34,9 @@ from .utils import (
|
||||
is_comet_ml_available,
|
||||
is_dvclive_available,
|
||||
is_mlflow_available,
|
||||
is_swanlab_available,
|
||||
is_tensorboard_available,
|
||||
is_trackio_available,
|
||||
is_wandb_available,
|
||||
listify,
|
||||
)
|
||||
@ -63,6 +65,12 @@ if is_clearml_available():
|
||||
if is_dvclive_available():
|
||||
_available_trackers.append(LoggerType.DVCLIVE)
|
||||
|
||||
if is_swanlab_available():
|
||||
_available_trackers.append(LoggerType.SWANLAB)
|
||||
|
||||
if is_trackio_available():
|
||||
_available_trackers.append(LoggerType.TRACKIO)
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
@ -133,7 +141,7 @@ class GeneralTracker:
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Lazy initialization of the tracker inside Accelerator to avoid initalizing PartialState before
|
||||
Lazy initialization of the tracker inside Accelerator to avoid initializing PartialState before
|
||||
InitProcessGroupKwargs.
|
||||
"""
|
||||
pass
|
||||
@ -332,7 +340,16 @@ class WandBTracker(GeneralTracker):
|
||||
"""
|
||||
import wandb
|
||||
|
||||
wandb.config.update(values, allow_val_change=True)
|
||||
if os.environ.get("WANDB_MODE") == "offline":
|
||||
# In offline mode, restart wandb with config included
|
||||
if hasattr(self, "run") and self.run:
|
||||
self.run.finish()
|
||||
|
||||
init_kwargs = self.init_kwargs.copy()
|
||||
init_kwargs["config"] = values
|
||||
self.run = wandb.init(project=self.run_name, **init_kwargs)
|
||||
else:
|
||||
wandb.config.update(values, allow_val_change=True)
|
||||
logger.debug("Stored initial configuration hyperparameters to WandB")
|
||||
|
||||
@on_main_process
|
||||
@ -411,6 +428,83 @@ class WandBTracker(GeneralTracker):
|
||||
logger.debug("WandB run closed")
|
||||
|
||||
|
||||
class TrackioTracker(GeneralTracker):
|
||||
"""
|
||||
A `Tracker` class that supports `trackio`. Should be initialized at the start of your script.
|
||||
|
||||
Args:
|
||||
run_name (`str`):
|
||||
The name of the experiment run. Will be used as the `project` name when instantiating trackio.
|
||||
**kwargs (additional keyword arguments, *optional*):
|
||||
Additional key word arguments passed along to the `trackio.init` method. Refer to this
|
||||
[init](https://github.com/gradio-app/trackio/blob/814809552310468b13f84f33764f1369b4e5136c/trackio/__init__.py#L22)
|
||||
to see all supported key word arguments.
|
||||
"""
|
||||
|
||||
name = "trackio"
|
||||
requires_logging_directory = False
|
||||
main_process_only = False
|
||||
|
||||
def __init__(self, run_name: str, **kwargs):
|
||||
super().__init__()
|
||||
self.run_name = run_name
|
||||
self.init_kwargs = kwargs
|
||||
|
||||
@on_main_process
|
||||
def start(self):
|
||||
import trackio
|
||||
|
||||
self.run = trackio.init(project=self.run_name, **self.init_kwargs)
|
||||
logger.debug(f"Initialized trackio project {self.run_name}")
|
||||
logger.debug(
|
||||
"Make sure to log any initial configurations with `self.store_init_configuration` before training!"
|
||||
)
|
||||
|
||||
@property
|
||||
def tracker(self):
|
||||
return self.run
|
||||
|
||||
@on_main_process
|
||||
def store_init_configuration(self, values: dict):
|
||||
"""
|
||||
Logs `values` as hyperparameters for the run. Should be run at the beginning of your experiment.
|
||||
|
||||
Args:
|
||||
values (Dictionary `str` to `bool`, `str`, `float` or `int`):
|
||||
Values to be stored as initial hyperparameters as key-value pairs. The values need to have type `bool`,
|
||||
`str`, `float`, `int`, or `None`.
|
||||
"""
|
||||
import trackio
|
||||
|
||||
trackio.config.update(values, allow_val_change=True)
|
||||
logger.debug("Stored initial configuration hyperparameters to trackio")
|
||||
|
||||
@on_main_process
|
||||
def log(self, values: dict, step: Optional[int] = None, **kwargs):
|
||||
"""
|
||||
Logs `values` to the current run.
|
||||
|
||||
Args:
|
||||
values (Dictionary `str` to `str`, `float`, `int` or `dict` of `str` to `float`/`int`):
|
||||
Values to be logged as key-value pairs. The values need to have type `str`, `float`, `int` or `dict` of
|
||||
`str` to `float`/`int`.
|
||||
step (`int`, *optional*):
|
||||
The run step. If included, the log will be affiliated with this step.
|
||||
kwargs:
|
||||
Additional key word arguments passed along to the `trackio.log` method.
|
||||
"""
|
||||
self.run.log(values, **kwargs)
|
||||
logger.debug("Successfully logged to trackio")
|
||||
|
||||
@on_main_process
|
||||
def finish(self):
|
||||
"""
|
||||
Closes `trackio` run
|
||||
"""
|
||||
self.run.finish()
|
||||
logger.debug("trackio run closed")
|
||||
|
||||
|
||||
class CometMLTracker(GeneralTracker):
|
||||
"""
|
||||
A `Tracker` class that supports `comet_ml`. Should be initialized at the start of your script.
|
||||
@ -1061,6 +1155,106 @@ class DVCLiveTracker(GeneralTracker):
|
||||
self.live.end()
|
||||
|
||||
|
||||
class SwanLabTracker(GeneralTracker):
|
||||
"""
|
||||
A `Tracker` class that supports `swanlab`. Should be initialized at the start of your script.
|
||||
|
||||
Args:
|
||||
run_name (`str`):
|
||||
The name of the experiment run.
|
||||
**kwargs (additional keyword arguments, *optional*):
|
||||
Additional key word arguments passed along to the `swanlab.init` method.
|
||||
"""
|
||||
|
||||
name = "swanlab"
|
||||
requires_logging_directory = False
|
||||
main_process_only = False
|
||||
|
||||
def __init__(self, run_name: str, **kwargs):
|
||||
super().__init__()
|
||||
self.run_name = run_name
|
||||
self.init_kwargs = kwargs
|
||||
|
||||
@on_main_process
|
||||
def start(self):
|
||||
import swanlab
|
||||
|
||||
self.run = swanlab.init(project=self.run_name, **self.init_kwargs)
|
||||
swanlab.config["FRAMEWORK"] = "🤗Accelerate" # add accelerate logo in config
|
||||
logger.debug(f"Initialized SwanLab project {self.run_name}")
|
||||
logger.debug(
|
||||
"Make sure to log any initial configurations with `self.store_init_configuration` before training!"
|
||||
)
|
||||
|
||||
@property
|
||||
def tracker(self):
|
||||
return self.run
|
||||
|
||||
@on_main_process
|
||||
def store_init_configuration(self, values: dict):
|
||||
"""
|
||||
Logs `values` as hyperparameters for the run. Should be run at the beginning of your experiment.
|
||||
|
||||
Args:
|
||||
values (Dictionary `str` to `bool`, `str`, `float` or `int`):
|
||||
Values to be stored as initial hyperparameters as key-value pairs. The values need to have type `bool`,
|
||||
`str`, `float`, `int`, or `None`.
|
||||
"""
|
||||
import swanlab
|
||||
|
||||
swanlab.config.update(values, allow_val_change=True)
|
||||
logger.debug("Stored initial configuration hyperparameters to SwanLab")
|
||||
|
||||
@on_main_process
|
||||
def log(self, values: dict, step: Optional[int] = None, **kwargs):
|
||||
"""
|
||||
Logs `values` to the current run.
|
||||
|
||||
Args:
|
||||
data : Dict[str, DataType]
|
||||
Data must be a dict. The key must be a string with 0-9, a-z, A-Z, " ", "_", "-", "/". The value must be a
|
||||
`float`, `float convertible object`, `int` or `swanlab.data.BaseType`.
|
||||
step : int, optional
|
||||
The step number of the current data, if not provided, it will be automatically incremented.
|
||||
If step is duplicated, the data will be ignored.
|
||||
kwargs:
|
||||
Additional key word arguments passed along to the `swanlab.log` method. Likes:
|
||||
print_to_console : bool, optional
|
||||
Whether to print the data to the console, the default is False.
|
||||
"""
|
||||
self.run.log(values, step=step, **kwargs)
|
||||
logger.debug("Successfully logged to SwanLab")
|
||||
|
||||
@on_main_process
|
||||
def log_images(self, values: dict, step: Optional[int] = None, **kwargs):
|
||||
"""
|
||||
Logs `images` to the current run.
|
||||
|
||||
Args:
|
||||
values (Dictionary `str` to `List` of `np.ndarray` or `PIL.Image`):
|
||||
Values to be logged as key-value pairs. The values need to have type `List` of `np.ndarray` or
|
||||
step (`int`, *optional*):
|
||||
The run step. If included, the log will be affiliated with this step.
|
||||
kwargs:
|
||||
Additional key word arguments passed along to the `swanlab.log` method. Likes:
|
||||
print_to_console : bool, optional
|
||||
Whether to print the data to the console, the default is False.
|
||||
"""
|
||||
import swanlab
|
||||
|
||||
for k, v in values.items():
|
||||
self.log({k: [swanlab.Image(image) for image in v]}, step=step, **kwargs)
|
||||
logger.debug("Successfully logged images to SwanLab")
|
||||
|
||||
@on_main_process
|
||||
def finish(self):
|
||||
"""
|
||||
Closes `swanlab` writer
|
||||
"""
|
||||
self.run.finish()
|
||||
logger.debug("SwanLab run closed")
|
||||
|
||||
|
||||
LOGGER_TYPE_TO_CLASS = {
|
||||
"aim": AimTracker,
|
||||
"comet_ml": CometMLTracker,
|
||||
@ -1069,6 +1263,8 @@ LOGGER_TYPE_TO_CLASS = {
|
||||
"wandb": WandBTracker,
|
||||
"clearml": ClearMLTracker,
|
||||
"dvclive": DVCLiveTracker,
|
||||
"swanlab": SwanLabTracker,
|
||||
"trackio": TrackioTracker,
|
||||
}
|
||||
|
||||
|
||||
@ -1090,9 +1286,12 @@ def filter_trackers(
|
||||
- `"all"`
|
||||
- `"tensorboard"`
|
||||
- `"wandb"`
|
||||
- `"trackio"`
|
||||
- `"aim"`
|
||||
- `"comet_ml"`
|
||||
- `"mlflow"`
|
||||
- `"dvclive"`
|
||||
- `"swanlab"`
|
||||
If `"all"` is selected, will pick up all available trackers in the environment and initialize them. Can
|
||||
also accept implementations of `GeneralTracker` for custom trackers, and can be combined with `"all"`.
|
||||
logging_dir (`str`, `os.PathLike`, *optional*):
|
||||
|
||||
@ -121,6 +121,7 @@ from .imports import (
|
||||
is_sagemaker_available,
|
||||
is_schedulefree_available,
|
||||
is_sdaa_available,
|
||||
is_swanlab_available,
|
||||
is_tensorboard_available,
|
||||
is_timm_available,
|
||||
is_torch_xla_available,
|
||||
@ -128,6 +129,7 @@ from .imports import (
|
||||
is_torchdata_available,
|
||||
is_torchdata_stateful_dataloader_available,
|
||||
is_torchvision_available,
|
||||
is_trackio_available,
|
||||
is_transformer_engine_available,
|
||||
is_transformers_available,
|
||||
is_triton_available,
|
||||
@ -281,6 +283,7 @@ from .other import (
|
||||
is_port_in_use,
|
||||
load,
|
||||
merge_dicts,
|
||||
model_has_dtensor,
|
||||
recursive_getattr,
|
||||
save,
|
||||
wait_for_everyone,
|
||||
|
||||
@ -289,7 +289,7 @@ class InitProcessGroupKwargs(KwargsHandler):
|
||||
# Literals
|
||||
Backend = Literal["MSAMP", "TE"]
|
||||
OptLevel = Literal["O1", "O2"]
|
||||
FP8Format = Literal["E4M3", "HYBRID"]
|
||||
FP8Format = Literal["HYBRID", "E4M3", "E5M2"]
|
||||
AmaxComputeAlgorithm = Literal["max", "most_recent"]
|
||||
|
||||
|
||||
@ -342,8 +342,8 @@ class TERecipeKwargs(KwargsHandler):
|
||||
interval (`int`, *optional*, default to 1):
|
||||
The interval to use for how often the scaling factor is recomputed.
|
||||
fp8_format (`str`, *optional*, default to "HYBRID"):
|
||||
The format to use for the FP8 recipe. Must be one of `HYBRID` or `E4M3`. (Generally `HYBRID` for training,
|
||||
`E4M3` for evaluation)
|
||||
The format to use for the FP8 recipe. Must be one of `HYBRID`, `E4M3` or `E5M2`. (Generally `HYBRID` for
|
||||
training, `E4M3` or `E5M2` for evaluation)
|
||||
amax_history_len (`int`, *optional*, default to 1024):
|
||||
The length of the history to use for the scaling factor computation
|
||||
amax_compute_algo (`str`, *optional*, default to "most_recent"):
|
||||
@ -616,8 +616,10 @@ class FP8BackendType(str, enum.Enum):
|
||||
"""
|
||||
|
||||
# Subclassing str as well as Enum allows the `FP8BackendType` to be JSON-serializable out of the box.
|
||||
NO = "NO"
|
||||
TE = "TE"
|
||||
MSAMP = "MSAMP"
|
||||
AO = "AO"
|
||||
|
||||
|
||||
class ComputeEnvironment(str, enum.Enum):
|
||||
@ -699,18 +701,24 @@ class LoggerType(BaseEnum):
|
||||
- **ALL** -- all available trackers in the environment that are supported
|
||||
- **TENSORBOARD** -- TensorBoard as an experiment tracker
|
||||
- **WANDB** -- wandb as an experiment tracker
|
||||
- **TRACKIO** -- trackio as an experiment tracker
|
||||
- **COMETML** -- comet_ml as an experiment tracker
|
||||
- **MLFLOW** -- mlflow as an experiment tracker
|
||||
- **CLEARML** -- clearml as an experiment tracker
|
||||
- **DVCLIVE** -- dvclive as an experiment tracker
|
||||
- **SWANLAB** -- swanlab as an experiment tracker
|
||||
"""
|
||||
|
||||
ALL = "all"
|
||||
AIM = "aim"
|
||||
TENSORBOARD = "tensorboard"
|
||||
WANDB = "wandb"
|
||||
TRACKIO = "trackio"
|
||||
COMETML = "comet_ml"
|
||||
MLFLOW = "mlflow"
|
||||
CLEARML = "clearml"
|
||||
DVCLIVE = "dvclive"
|
||||
SWANLAB = "swanlab"
|
||||
|
||||
|
||||
class PrecisionType(str, BaseEnum):
|
||||
@ -2088,6 +2096,8 @@ class TorchTensorParallelPlugin:
|
||||
# support for other devices has to be investigated
|
||||
if is_hpu_available(init_hccl=True):
|
||||
device = "hpu"
|
||||
elif is_xpu_available():
|
||||
device = "xpu"
|
||||
else:
|
||||
device = "cuda"
|
||||
|
||||
|
||||
@ -261,22 +261,36 @@ class DeepSpeedEngineWrapper:
|
||||
def __init__(self, engine):
|
||||
self.engine = engine
|
||||
|
||||
def backward(self, loss, **kwargs):
|
||||
def backward(self, loss, sync_gradients=True, **kwargs):
|
||||
# Set gradient accumulation boundary based on Accelerate's sync_gradients state
|
||||
# This tells DeepSpeed whether this is the final micro-batch before gradient sync
|
||||
self.engine.set_gradient_accumulation_boundary(is_boundary=sync_gradients)
|
||||
|
||||
# runs backpropagation and handles mixed precision
|
||||
self.engine.backward(loss, **kwargs)
|
||||
|
||||
# Deepspeed's `engine.step` performs the following operations:
|
||||
# - gradient accumulation check
|
||||
# - gradient clipping
|
||||
# - optimizer step
|
||||
# - zero grad
|
||||
# - checking overflow
|
||||
# - lr_scheduler step (only if engine.lr_scheduler is not None)
|
||||
self.engine.step()
|
||||
# Only perform step and related operations at gradient accumulation boundaries
|
||||
if sync_gradients:
|
||||
# Deepspeed's `engine.step` performs the following operations:
|
||||
# - gradient accumulation check
|
||||
# - gradient clipping
|
||||
# - optimizer step
|
||||
# - zero grad
|
||||
# - checking overflow
|
||||
# - lr_scheduler step (only if engine.lr_scheduler is not None)
|
||||
self.engine.step()
|
||||
# and this plugin overrides the above calls with no-ops when Accelerate runs under
|
||||
# Deepspeed, but allows normal functionality for non-Deepspeed cases thus enabling a simple
|
||||
# training loop that works transparently under many training regimes.
|
||||
|
||||
def get_global_grad_norm(self):
|
||||
"""Get the global gradient norm from DeepSpeed engine."""
|
||||
grad_norm = self.engine.get_global_grad_norm()
|
||||
# Convert to scalar if it's a tensor
|
||||
if hasattr(grad_norm, "item"):
|
||||
return grad_norm.item()
|
||||
return grad_norm
|
||||
|
||||
|
||||
class DeepSpeedOptimizerWrapper(AcceleratedOptimizer):
|
||||
"""
|
||||
|
||||
@ -179,10 +179,9 @@ def load_fsdp_model(fsdp_plugin, accelerator, model, input_dir, model_index=0, a
|
||||
else nullcontext()
|
||||
)
|
||||
sd_options = _prepare_sd_options(fsdp_plugin)
|
||||
|
||||
with ctx:
|
||||
if fsdp_plugin.state_dict_type == StateDictType.FULL_STATE_DICT:
|
||||
if type(model) is not FSDP and accelerator.process_index != 0:
|
||||
if type(model) is not FSDP and accelerator.process_index != 0 and not accelerator.is_fsdp2:
|
||||
if not fsdp_plugin.sync_module_states and fsdp_plugin.fsdp_version == 1:
|
||||
raise ValueError(
|
||||
"Set the `sync_module_states` flag to `True` so that model states are synced across processes when "
|
||||
@ -192,7 +191,12 @@ def load_fsdp_model(fsdp_plugin, accelerator, model, input_dir, model_index=0, a
|
||||
weights_name = f"{FSDP_MODEL_NAME}.bin" if model_index == 0 else f"{FSDP_MODEL_NAME}_{model_index}.bin"
|
||||
input_model_file = os.path.join(input_dir, weights_name)
|
||||
logger.info(f"Loading model from {input_model_file}")
|
||||
state_dict = torch.load(input_model_file, weights_only=True)
|
||||
# we want an empty state dict for FSDP2 as we use `broadcast_from_rank0`
|
||||
load_model = not accelerator.is_fsdp2 or accelerator.is_main_process
|
||||
if load_model:
|
||||
state_dict = torch.load(input_model_file, weights_only=True)
|
||||
else:
|
||||
state_dict = {}
|
||||
logger.info(f"Model loaded from {input_model_file}")
|
||||
elif fsdp_plugin.state_dict_type == StateDictType.LOCAL_STATE_DICT:
|
||||
weights_name = (
|
||||
@ -498,10 +502,10 @@ def fsdp2_load_full_state_dict(accelerator, model: torch.nn.Module, full_sd: dic
|
||||
|
||||
if accelerator.is_main_process:
|
||||
for (param_name, full_param), sharded_param in zip(full_sd.items(), meta_sharded_sd.values()):
|
||||
full_param = full_param.detach().cuda()
|
||||
mesh = sharded_param.device_mesh
|
||||
dist.broadcast(full_param, src=0, group=mesh.get_group())
|
||||
sharded_tensor = distribute_tensor(full_param, mesh, sharded_param.placements)
|
||||
device_mesh = sharded_param.device_mesh
|
||||
full_param = full_param.detach().to(device_mesh.device_type)
|
||||
dist.broadcast(full_param, src=0, group=device_mesh.get_group())
|
||||
sharded_tensor = distribute_tensor(full_param, device_mesh, sharded_param.placements)
|
||||
to_contiguous, casting_dtype = _infer_parameter_dtype(
|
||||
model,
|
||||
param_name,
|
||||
@ -512,10 +516,10 @@ def fsdp2_load_full_state_dict(accelerator, model: torch.nn.Module, full_sd: dic
|
||||
# We need this else to have a matching `broadcast` for all of the ranks, else we deadlock
|
||||
else:
|
||||
for param_name, sharded_param in meta_sharded_sd.items():
|
||||
full_tensor = torch.empty(sharded_param.size(), device="cuda", dtype=sharded_param.dtype)
|
||||
mesh = sharded_param.device_mesh
|
||||
dist.broadcast(full_tensor, src=0, group=mesh.get_group())
|
||||
sharded_tensor = distribute_tensor(full_tensor, mesh, sharded_param.placements)
|
||||
device_mesh = sharded_param.device_mesh
|
||||
full_tensor = torch.empty(sharded_param.size(), device=device_mesh.device_type, dtype=sharded_param.dtype)
|
||||
dist.broadcast(full_tensor, src=0, group=device_mesh.get_group())
|
||||
sharded_tensor = distribute_tensor(full_tensor, device_mesh, sharded_param.placements)
|
||||
to_contiguous, casting_dtype = _infer_parameter_dtype(
|
||||
model,
|
||||
param_name,
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
import importlib
|
||||
import importlib.metadata
|
||||
import os
|
||||
import sys
|
||||
import warnings
|
||||
from functools import lru_cache, wraps
|
||||
|
||||
@ -281,6 +282,14 @@ def is_comet_ml_available():
|
||||
return _is_package_available("comet_ml")
|
||||
|
||||
|
||||
def is_swanlab_available():
|
||||
return _is_package_available("swanlab")
|
||||
|
||||
|
||||
def is_trackio_available():
|
||||
return sys.version_info >= (3, 10) and _is_package_available("trackio")
|
||||
|
||||
|
||||
def is_boto3_available():
|
||||
return _is_package_available("boto3")
|
||||
|
||||
|
||||
@ -89,9 +89,9 @@ def setup_fp8_env(args: argparse.Namespace, current_env: dict[str, str]):
|
||||
value = getattr(args, arg)
|
||||
if value is not None:
|
||||
if arg == "fp8_override_linear_precision":
|
||||
current_env[prefix + "FP8_OVERRIDE_FPROP"] = value[0]
|
||||
current_env[prefix + "FP8_OVERRIDE_DGRAD"] = value[1]
|
||||
current_env[prefix + "FP8_OVERRIDE_WGRAD"] = value[2]
|
||||
current_env[prefix + "FP8_OVERRIDE_FPROP"] = str(value[0])
|
||||
current_env[prefix + "FP8_OVERRIDE_DGRAD"] = str(value[1])
|
||||
current_env[prefix + "FP8_OVERRIDE_WGRAD"] = str(value[2])
|
||||
else:
|
||||
current_env[f"{prefix}{arg.upper()}"] = str(getattr(args, arg))
|
||||
return current_env
|
||||
|
||||
@ -121,7 +121,7 @@ def find_executable_batch_size(
|
||||
):
|
||||
"""
|
||||
A basic decorator that will try to execute `function`. If it fails from exceptions related to out-of-memory or
|
||||
CUDNN, the batch size is cut in half and passed to `function`
|
||||
CUDNN, the batch size is multiplied by 0.9 and passed to `function`
|
||||
|
||||
`function` must take in a `batch_size` parameter as its first argument.
|
||||
|
||||
@ -153,7 +153,7 @@ def find_executable_batch_size(
|
||||
|
||||
def reduce_batch_size_fn():
|
||||
nonlocal batch_size
|
||||
batch_size = batch_size // 2
|
||||
batch_size = int(batch_size * 0.9)
|
||||
return batch_size
|
||||
|
||||
def decorator(*args, **kwargs):
|
||||
|
||||
@ -222,6 +222,8 @@ def set_module_tensor_to_device(
|
||||
dtype: Optional[Union[str, torch.dtype]] = None,
|
||||
fp16_statistics: Optional[torch.HalfTensor] = None,
|
||||
tied_params_map: Optional[dict[int, dict[torch.device, torch.Tensor]]] = None,
|
||||
non_blocking: bool = False,
|
||||
clear_cache: bool = True,
|
||||
):
|
||||
"""
|
||||
A helper function to set a given tensor (parameter of buffer) of a module on a specific device (note that doing
|
||||
@ -245,6 +247,10 @@ def set_module_tensor_to_device(
|
||||
A map of current data pointers to dictionaries of devices to already dispatched tied weights. For a given
|
||||
execution device, this parameter is useful to reuse the first available pointer of a shared weight on the
|
||||
device for all others, instead of duplicating memory.
|
||||
non_blocking (`bool`, *optional*, defaults to `False`):
|
||||
If `True`, the device transfer will be asynchronous with respect to the host, if possible.
|
||||
clear_cache (`bool`, *optional*, defaults to `True`):
|
||||
Whether or not to clear the device cache after setting the tensor on the device.
|
||||
"""
|
||||
# Recurse if needed
|
||||
if "." in tensor_name:
|
||||
@ -295,9 +301,9 @@ def set_module_tensor_to_device(
|
||||
|
||||
if dtype is None:
|
||||
# For compatibility with PyTorch load_state_dict which converts state dict dtype to existing dtype in model
|
||||
value = value.to(old_value.dtype)
|
||||
value = value.to(old_value.dtype, non_blocking=non_blocking)
|
||||
elif not str(value.dtype).startswith(("torch.uint", "torch.int", "torch.bool")):
|
||||
value = value.to(dtype)
|
||||
value = value.to(dtype, non_blocking=non_blocking)
|
||||
|
||||
device_quantization = None
|
||||
with torch.no_grad():
|
||||
@ -326,15 +332,15 @@ def set_module_tensor_to_device(
|
||||
if "xpu" in str(device) and not is_xpu_available():
|
||||
raise ValueError(f'{device} is not available, you should use device="cpu" instead')
|
||||
if value is None:
|
||||
new_value = old_value.to(device)
|
||||
new_value = old_value.to(device, non_blocking=non_blocking)
|
||||
if dtype is not None and device in ["meta", torch.device("meta")]:
|
||||
if not str(old_value.dtype).startswith(("torch.uint", "torch.int", "torch.bool")):
|
||||
new_value = new_value.to(dtype)
|
||||
new_value = new_value.to(dtype, non_blocking=non_blocking)
|
||||
|
||||
if not is_buffer:
|
||||
module._parameters[tensor_name] = param_cls(new_value, requires_grad=old_value.requires_grad)
|
||||
elif isinstance(value, torch.Tensor):
|
||||
new_value = value.to(device)
|
||||
new_value = value.to(device, non_blocking=non_blocking)
|
||||
else:
|
||||
new_value = torch.tensor(value, device=device)
|
||||
if device_quantization is not None:
|
||||
@ -347,24 +353,30 @@ def set_module_tensor_to_device(
|
||||
if param_cls.__name__ in ["Int8Params", "FP4Params", "Params4bit"]:
|
||||
if param_cls.__name__ == "Int8Params" and new_value.dtype == torch.float32:
|
||||
# downcast to fp16 if any - needed for 8bit serialization
|
||||
new_value = new_value.to(torch.float16)
|
||||
new_value = new_value.to(torch.float16, non_blocking=non_blocking)
|
||||
# quantize module that are going to stay on the cpu so that we offload quantized weights
|
||||
if device == "cpu" and param_cls.__name__ == "Int8Params":
|
||||
new_value = param_cls(new_value, requires_grad=old_value.requires_grad, **kwargs).to(0).to("cpu")
|
||||
new_value.CB = new_value.CB.to("cpu")
|
||||
new_value.SCB = new_value.SCB.to("cpu")
|
||||
else:
|
||||
new_value = param_cls(new_value, requires_grad=old_value.requires_grad, **kwargs).to(device)
|
||||
new_value = param_cls(new_value, requires_grad=old_value.requires_grad, **kwargs).to(
|
||||
device, non_blocking=non_blocking
|
||||
)
|
||||
elif param_cls.__name__ in ["QTensor", "QBitsTensor"]:
|
||||
new_value = torch.nn.Parameter(new_value, requires_grad=old_value.requires_grad).to(device)
|
||||
new_value = torch.nn.Parameter(new_value, requires_grad=old_value.requires_grad).to(
|
||||
device, non_blocking=non_blocking
|
||||
)
|
||||
elif param_cls.__name__ in ["AffineQuantizedTensor"]:
|
||||
new_value = new_value.to(device)
|
||||
new_value = new_value.to(device, non_blocking=non_blocking)
|
||||
else:
|
||||
new_value = param_cls(new_value, requires_grad=old_value.requires_grad).to(device)
|
||||
new_value = param_cls(new_value, requires_grad=old_value.requires_grad).to(
|
||||
device, non_blocking=non_blocking
|
||||
)
|
||||
|
||||
module._parameters[tensor_name] = new_value
|
||||
if fp16_statistics is not None:
|
||||
module._parameters[tensor_name].SCB = fp16_statistics.to(device)
|
||||
module._parameters[tensor_name].SCB = fp16_statistics.to(device, non_blocking=non_blocking)
|
||||
del fp16_statistics
|
||||
# as we put the weight to meta, it doesn't have SCB attr anymore. make sure that it is not a meta weight
|
||||
if (
|
||||
@ -390,8 +402,9 @@ def set_module_tensor_to_device(
|
||||
device_index = torch.device(device).index if torch.device(device).type == "cuda" else None
|
||||
if not getattr(module.weight, "quant_state", None) and device_index is not None:
|
||||
module.weight = module.weight.cuda(device_index)
|
||||
|
||||
# clean pre and post forward hook
|
||||
if device != "cpu":
|
||||
if clear_cache and device != "cpu":
|
||||
clear_device_cache()
|
||||
|
||||
# When handling tied weights, we update tied_params_map to keep track of the tied weights that have already been allocated on the device in
|
||||
@ -1594,6 +1607,14 @@ def check_device_map(model: nn.Module, device_map: dict[str, Union[int, str, tor
|
||||
model (`torch.nn.Module`): The model to check the device map against.
|
||||
device_map (`Dict[str, Union[int, str, torch.device]]`): The device map to check.
|
||||
"""
|
||||
all_module_names = dict(model.named_modules())
|
||||
invalid_keys = [k for k in device_map if k != "" and k not in all_module_names]
|
||||
|
||||
if invalid_keys:
|
||||
warnings.warn(
|
||||
f"The following device_map keys do not match any submodules in the model: {invalid_keys}", UserWarning
|
||||
)
|
||||
|
||||
all_model_tensors = [name for name, _ in model.state_dict().items()]
|
||||
for module_name in device_map.keys():
|
||||
if module_name == "":
|
||||
|
||||
@ -194,6 +194,26 @@ def compile_regions_deepspeed(module: torch.nn.Module, **compile_kwargs):
|
||||
module.compile(**compile_kwargs)
|
||||
|
||||
|
||||
def model_has_dtensor(model: torch.nn.Module) -> bool:
|
||||
"""
|
||||
Check if the model has DTensor parameters.
|
||||
|
||||
Args:
|
||||
model (`torch.nn.Module`):
|
||||
The model to check.
|
||||
|
||||
Returns:
|
||||
`bool`: Whether the model has DTensor parameters.
|
||||
"""
|
||||
if is_torch_version(">=", "2.5.0"):
|
||||
from torch.distributed.tensor import DTensor
|
||||
else:
|
||||
# from torch 2.0.0 (oldest supported accelerate torch version), DTensor is in torch.distributed._tensor
|
||||
from torch.distributed._tensor import DTensor
|
||||
|
||||
return any(isinstance(p, DTensor) for p in model.parameters())
|
||||
|
||||
|
||||
def extract_model_from_parallel(
|
||||
model, keep_fp32_wrapper: bool = True, keep_torch_compile: bool = True, recursive: bool = False
|
||||
):
|
||||
|
||||
240
tests/deepspeed/test_deepspeed_gradient_accumulation.py
Normal file
240
tests/deepspeed/test_deepspeed_gradient_accumulation.py
Normal file
@ -0,0 +1,240 @@
|
||||
# Copyright 2022 The HuggingFace Team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import inspect
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import torch
|
||||
from torch.utils.data import DataLoader
|
||||
from transformers import AutoModel
|
||||
from transformers.trainer_utils import set_seed
|
||||
|
||||
from accelerate.accelerator import Accelerator
|
||||
from accelerate.test_utils.testing import AccelerateTestCase, require_deepspeed
|
||||
from accelerate.test_utils.training import RegressionDataset
|
||||
from accelerate.utils import patch_environment
|
||||
from accelerate.utils.dataclasses import DeepSpeedPlugin
|
||||
|
||||
|
||||
set_seed(42)
|
||||
|
||||
GPT2_TINY = "hf-internal-testing/tiny-random-gpt2"
|
||||
ZERO2 = "zero2"
|
||||
ZERO3 = "zero3"
|
||||
FP16 = "fp16"
|
||||
|
||||
|
||||
@require_deepspeed
|
||||
class DeepSpeedGradientAccumulationTest(AccelerateTestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
|
||||
self._test_file_path = inspect.getfile(self.__class__)
|
||||
path = Path(self._test_file_path).resolve()
|
||||
self.test_file_dir_str = str(path.parents[0])
|
||||
|
||||
self.ds_config_file = dict(
|
||||
zero2=f"{self.test_file_dir_str}/ds_config_zero2.json",
|
||||
zero3=f"{self.test_file_dir_str}/ds_config_zero3.json",
|
||||
)
|
||||
|
||||
# Load config files
|
||||
with open(self.ds_config_file[ZERO2], encoding="utf-8") as f:
|
||||
config_zero2 = json.load(f)
|
||||
with open(self.ds_config_file[ZERO3], encoding="utf-8") as f:
|
||||
config_zero3 = json.load(f)
|
||||
config_zero3["zero_optimization"]["stage3_gather_16bit_weights_on_model_save"] = False
|
||||
|
||||
self.ds_config_dict = dict(zero2=config_zero2, zero3=config_zero3)
|
||||
|
||||
self.dist_env = dict(
|
||||
ACCELERATE_USE_DEEPSPEED="true",
|
||||
MASTER_ADDR="localhost",
|
||||
MASTER_PORT="10999",
|
||||
RANK="0",
|
||||
LOCAL_RANK="0",
|
||||
WORLD_SIZE="1",
|
||||
)
|
||||
|
||||
def test_gradient_accumulation_boundary_integration(self):
|
||||
"""Test that gradient accumulation boundaries are automatically handled by DeepSpeed integration."""
|
||||
gradient_accumulation_steps = 4
|
||||
|
||||
deepspeed_plugin = DeepSpeedPlugin(
|
||||
gradient_accumulation_steps=gradient_accumulation_steps,
|
||||
gradient_clipping=1.0,
|
||||
zero_stage=2,
|
||||
offload_optimizer_device="cpu",
|
||||
offload_param_device="cpu",
|
||||
zero3_save_16bit_model=False,
|
||||
zero3_init_flag=False,
|
||||
)
|
||||
|
||||
with patch_environment(**self.dist_env):
|
||||
accelerator = Accelerator(mixed_precision="fp16", deepspeed_plugin=deepspeed_plugin)
|
||||
|
||||
# Setup simple training components
|
||||
train_set = RegressionDataset(length=80)
|
||||
train_dataloader = DataLoader(train_set, batch_size=16, shuffle=True)
|
||||
model = AutoModel.from_pretrained(GPT2_TINY)
|
||||
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
|
||||
|
||||
model, optimizer, train_dataloader = accelerator.prepare(model, optimizer, train_dataloader)
|
||||
|
||||
model.train()
|
||||
|
||||
# Test gradient accumulation with accumulate context manager
|
||||
batch_data = next(iter(train_dataloader))
|
||||
# Create proper input format for GPT2 model (RegressionDataset returns {"x": scalar, "y": scalar})
|
||||
# We need to create dummy input_ids for the GPT2 model
|
||||
batch_size = batch_data["x"].shape[0] if isinstance(batch_data["x"], torch.Tensor) else 1
|
||||
|
||||
# Create dummy input_ids for GPT2 model and move to same device as model
|
||||
device = next(model.parameters()).device
|
||||
input_ids = torch.randint(0, 1000, (batch_size, 10), device=device) # batch_size x sequence_length
|
||||
inputs = {"input_ids": input_ids}
|
||||
|
||||
# Track sync_gradients values to verify correct gradient accumulation behavior
|
||||
sync_values = []
|
||||
|
||||
# Simulate gradient accumulation steps
|
||||
for micro_step in range(gradient_accumulation_steps):
|
||||
with accelerator.accumulate(model):
|
||||
sync_values.append(accelerator.sync_gradients)
|
||||
outputs = model(**inputs)
|
||||
# Use the last hidden state and create a simple loss
|
||||
prediction = outputs.last_hidden_state.mean()
|
||||
loss = prediction.sum() # Simple scalar loss
|
||||
|
||||
# This should automatically handle gradient accumulation boundaries
|
||||
accelerator.backward(loss)
|
||||
|
||||
if accelerator.sync_gradients:
|
||||
optimizer.step()
|
||||
optimizer.zero_grad()
|
||||
|
||||
# Verify gradient accumulation pattern was correct
|
||||
# Should be False for first 3 steps, True for the last step
|
||||
expected_sync = [False, False, False, True]
|
||||
self.assertEqual(sync_values, expected_sync)
|
||||
|
||||
# Reset step counter for accelerator
|
||||
accelerator.step = 0
|
||||
|
||||
def test_clip_grad_norm_returns_deepspeed_grad_norm(self):
|
||||
"""Test that clip_grad_norm_ works with DeepSpeed and returns gradient norm when available."""
|
||||
deepspeed_plugin = DeepSpeedPlugin(
|
||||
gradient_accumulation_steps=1,
|
||||
gradient_clipping=1.0,
|
||||
zero_stage=2,
|
||||
offload_optimizer_device="cpu",
|
||||
offload_param_device="cpu",
|
||||
zero3_save_16bit_model=False,
|
||||
zero3_init_flag=False,
|
||||
)
|
||||
|
||||
with patch_environment(**self.dist_env):
|
||||
accelerator = Accelerator(mixed_precision="fp16", deepspeed_plugin=deepspeed_plugin)
|
||||
|
||||
# Setup simple model
|
||||
model = AutoModel.from_pretrained(GPT2_TINY)
|
||||
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
|
||||
|
||||
# Create a simple dataloader for prepare to work
|
||||
train_set = RegressionDataset(length=16)
|
||||
train_dataloader = DataLoader(train_set, batch_size=16, shuffle=True)
|
||||
|
||||
model, optimizer, train_dataloader = accelerator.prepare(model, optimizer, train_dataloader)
|
||||
|
||||
# Perform a forward and backward pass to generate gradients
|
||||
batch_data = next(iter(train_dataloader))
|
||||
batch_size = len(batch_data["x"]) if isinstance(batch_data["x"], torch.Tensor) else 1
|
||||
|
||||
# Create dummy input_ids for GPT2 model and move to same device as model
|
||||
device = next(model.parameters()).device
|
||||
input_ids = torch.randint(0, 1000, (batch_size, 10), device=device)
|
||||
inputs = {"input_ids": input_ids}
|
||||
|
||||
# Forward pass
|
||||
outputs = model(**inputs)
|
||||
prediction = outputs.last_hidden_state.mean()
|
||||
loss = prediction.sum()
|
||||
|
||||
# Backward pass to generate gradients
|
||||
accelerator.backward(loss)
|
||||
|
||||
# Test that gradient clipping works and returns a value
|
||||
grad_norm = accelerator.clip_grad_norm_(model.parameters(), max_norm=1.0)
|
||||
# After backward pass, we should get a valid gradient norm (either from DeepSpeed or fallback)
|
||||
self.assertIsInstance(grad_norm, (int, float, type(None)))
|
||||
if grad_norm is not None:
|
||||
self.assertGreaterEqual(grad_norm, 0.0)
|
||||
|
||||
def test_accelerator_backward_passes_sync_gradients(self):
|
||||
"""Test that Accelerator.backward() passes sync_gradients to DeepSpeed wrapper."""
|
||||
deepspeed_plugin = DeepSpeedPlugin(
|
||||
gradient_accumulation_steps=2,
|
||||
gradient_clipping=1.0,
|
||||
zero_stage=2,
|
||||
offload_optimizer_device="cpu",
|
||||
offload_param_device="cpu",
|
||||
zero3_save_16bit_model=False,
|
||||
zero3_init_flag=False,
|
||||
)
|
||||
|
||||
with patch_environment(**self.dist_env):
|
||||
accelerator = Accelerator(mixed_precision="fp16", deepspeed_plugin=deepspeed_plugin)
|
||||
|
||||
# Setup simple model and data
|
||||
model = AutoModel.from_pretrained(GPT2_TINY)
|
||||
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
|
||||
train_set = RegressionDataset(length=16)
|
||||
train_dataloader = DataLoader(train_set, batch_size=8, shuffle=True)
|
||||
|
||||
model, optimizer, train_dataloader = accelerator.prepare(model, optimizer, train_dataloader)
|
||||
|
||||
# Track sync_gradients values during backward calls
|
||||
sync_values = []
|
||||
|
||||
# Test two gradient accumulation steps
|
||||
batch_data = next(iter(train_dataloader))
|
||||
# Create proper input format for GPT2 model
|
||||
batch_size = len(batch_data["x"]) if isinstance(batch_data["x"], torch.Tensor) else 1
|
||||
|
||||
# Create dummy input_ids for GPT2 model and move to same device as model
|
||||
device = next(model.parameters()).device
|
||||
input_ids = torch.randint(0, 1000, (batch_size, 10), device=device)
|
||||
inputs = {"input_ids": input_ids}
|
||||
|
||||
# First step - should have sync_gradients=False
|
||||
with accelerator.accumulate(model):
|
||||
sync_values.append(accelerator.sync_gradients)
|
||||
outputs = model(**inputs)
|
||||
prediction = outputs.last_hidden_state.mean()
|
||||
loss = prediction # Simple loss
|
||||
accelerator.backward(loss)
|
||||
|
||||
# Second step - should have sync_gradients=True
|
||||
with accelerator.accumulate(model):
|
||||
sync_values.append(accelerator.sync_gradients)
|
||||
outputs = model(**inputs)
|
||||
prediction = outputs.last_hidden_state.mean()
|
||||
loss = prediction # Simple loss
|
||||
accelerator.backward(loss)
|
||||
|
||||
# Verify sync_gradients pattern was correct
|
||||
self.assertEqual(len(sync_values), 2)
|
||||
self.assertFalse(sync_values[0]) # First step: not syncing
|
||||
self.assertTrue(sync_values[1]) # Second step: syncing
|
||||
@ -29,6 +29,7 @@ from accelerate.test_utils.testing import (
|
||||
get_launch_command,
|
||||
path_in_accelerate_package,
|
||||
require_fp16,
|
||||
require_fsdp2,
|
||||
require_multi_device,
|
||||
require_non_cpu,
|
||||
require_non_torch_xla,
|
||||
@ -37,7 +38,6 @@ from accelerate.test_utils.testing import (
|
||||
)
|
||||
from accelerate.utils import is_bf16_available, is_fp16_available, is_hpu_available, patch_environment, set_seed
|
||||
from accelerate.utils.constants import (
|
||||
FSDP2_PYTORCH_VERSION,
|
||||
FSDP2_STATE_DICT_TYPE,
|
||||
FSDP_AUTO_WRAP_POLICY,
|
||||
FSDP_BACKWARD_PREFETCH,
|
||||
@ -46,7 +46,6 @@ from accelerate.utils.constants import (
|
||||
)
|
||||
from accelerate.utils.dataclasses import FullyShardedDataParallelPlugin
|
||||
from accelerate.utils.fsdp_utils import disable_fsdp_ram_efficient_loading, enable_fsdp_ram_efficient_loading
|
||||
from accelerate.utils.versions import is_torch_version
|
||||
|
||||
|
||||
set_seed(42)
|
||||
@ -63,10 +62,6 @@ if is_fp16_available():
|
||||
if is_bf16_available():
|
||||
dtypes.append(BF16)
|
||||
|
||||
FSDP_VERSIONS = [1]
|
||||
if is_torch_version(">=", FSDP2_PYTORCH_VERSION):
|
||||
FSDP_VERSIONS.append(2)
|
||||
|
||||
|
||||
@require_non_cpu
|
||||
@require_non_torch_xla
|
||||
@ -90,24 +85,7 @@ class FSDPPluginIntegration(AccelerateTestCase):
|
||||
2: self.fsdp2_env,
|
||||
}
|
||||
|
||||
def run(self, result=None):
|
||||
"""Override run to get the current test name and format failures to include FSDP version."""
|
||||
test_method = getattr(self, self._testMethodName)
|
||||
orig_test_method = test_method
|
||||
|
||||
def test_wrapper(*args, **kwargs):
|
||||
for fsdp_version in FSDP_VERSIONS:
|
||||
try:
|
||||
self.current_fsdp_version = fsdp_version
|
||||
return orig_test_method(*args, **kwargs)
|
||||
except Exception as e:
|
||||
raise type(e)(f"FSDP version {fsdp_version}: {str(e)}") from e
|
||||
|
||||
setattr(self, self._testMethodName, test_wrapper)
|
||||
try:
|
||||
return super().run(result)
|
||||
finally:
|
||||
setattr(self, self._testMethodName, orig_test_method)
|
||||
self.current_fsdp_version = 1
|
||||
|
||||
def test_sharding_strategy(self):
|
||||
from torch.distributed.fsdp.fully_sharded_data_parallel import ShardingStrategy
|
||||
@ -421,6 +399,15 @@ class FSDPPluginIntegration(AccelerateTestCase):
|
||||
assert os.environ.get("FSDP_CPU_RAM_EFFICIENT_LOADING") == "False"
|
||||
|
||||
|
||||
@require_fsdp2
|
||||
@require_non_cpu
|
||||
@require_non_torch_xla
|
||||
class FSDP2PluginIntegration(FSDPPluginIntegration):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.current_fsdp_version = 2
|
||||
|
||||
|
||||
@run_first
|
||||
# Skip this test when TorchXLA is available because accelerate.launch does not support TorchXLA FSDP.
|
||||
@require_non_torch_xla
|
||||
@ -462,24 +449,7 @@ class FSDPIntegrationTest(TempDirTestCase):
|
||||
self.n_train = 160
|
||||
self.n_val = 160
|
||||
|
||||
def run(self, result=None):
|
||||
"""Override run to get the current test name and format failures to include FSDP version."""
|
||||
test_method = getattr(self, self._testMethodName)
|
||||
orig_test_method = test_method
|
||||
|
||||
def test_wrapper(*args, **kwargs):
|
||||
for fsdp_version in FSDP_VERSIONS:
|
||||
try:
|
||||
self.current_fsdp_version = fsdp_version
|
||||
return orig_test_method(*args, **kwargs)
|
||||
except Exception as e:
|
||||
raise type(e)(f"FSDP version {fsdp_version}: {str(e)}") from e
|
||||
|
||||
setattr(self, self._testMethodName, test_wrapper)
|
||||
try:
|
||||
return super().run(result)
|
||||
finally:
|
||||
setattr(self, self._testMethodName, orig_test_method)
|
||||
self.current_fsdp_version = 1
|
||||
|
||||
@require_fp16
|
||||
def test_performance(self):
|
||||
@ -633,3 +603,15 @@ class FSDPIntegrationTest(TempDirTestCase):
|
||||
)
|
||||
with patch_environment(omp_num_threads=1):
|
||||
execute_subprocess_async(cmd_config)
|
||||
|
||||
|
||||
@require_fsdp2
|
||||
@run_first
|
||||
# Skip this test when TorchXLA is available because accelerate.launch does not support TorchXLA FSDP.
|
||||
@require_non_torch_xla
|
||||
@require_multi_device
|
||||
@slow
|
||||
class FSDP2IntegrationTest(FSDPIntegrationTest):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.current_fsdp_version = 2
|
||||
|
||||
@ -34,7 +34,6 @@ else:
|
||||
backend = "inductor"
|
||||
|
||||
|
||||
@require_non_hpu
|
||||
@require_huggingface_suite
|
||||
class RegionalCompilationTester(unittest.TestCase):
|
||||
def _get_model_and_inputs(self):
|
||||
@ -109,6 +108,7 @@ class RegionalCompilationTester(unittest.TestCase):
|
||||
release_memory(model, full_compilation_model, regional_compilation_model)
|
||||
|
||||
@slow
|
||||
@require_non_hpu
|
||||
@require_non_cpu
|
||||
@require_huggingface_suite
|
||||
def test_regional_compilation_inference_speedup(self):
|
||||
|
||||
@ -239,7 +239,10 @@ class FeatureExamplesTests(TempDirTestCase):
|
||||
run_command(self.launch_args + testargs)
|
||||
|
||||
@require_trackers
|
||||
@mock.patch.dict(os.environ, {"WANDB_MODE": "offline", "DVCLIVE_TEST": "true"})
|
||||
@mock.patch.dict(
|
||||
os.environ,
|
||||
{"WANDB_MODE": "offline", "DVCLIVE_TEST": "true", "SWANLAB_MODE": "offline"},
|
||||
)
|
||||
def test_tracking(self):
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
testargs = f"""
|
||||
|
||||
@ -12,9 +12,13 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
import textwrap
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
import torch
|
||||
|
||||
@ -32,16 +36,18 @@ from accelerate.test_utils import (
|
||||
from accelerate.test_utils.testing import require_deepspeed, run_command
|
||||
from accelerate.utils import (
|
||||
AORecipeKwargs,
|
||||
FP8RecipeKwargs,
|
||||
TERecipeKwargs,
|
||||
has_ao_layers,
|
||||
has_transformer_engine_layers,
|
||||
is_torchao_available,
|
||||
is_transformer_engine_available,
|
||||
)
|
||||
|
||||
|
||||
def can_convert_te_model():
|
||||
accelerator_kwargs = {"mixed_precision": "fp8", "kwargs_handlers": [FP8RecipeKwargs(backend="TE")]}
|
||||
def can_convert_te_model(from_config=False):
|
||||
if not from_config:
|
||||
accelerator_kwargs = {"mixed_precision": "fp8", "kwargs_handlers": [TERecipeKwargs()]}
|
||||
else:
|
||||
accelerator_kwargs = {}
|
||||
|
||||
accelerator = Accelerator(**accelerator_kwargs)
|
||||
dataloader = torch.utils.data.DataLoader(torch.randn(10, 32), batch_size=2)
|
||||
model = torch.nn.Sequential(torch.nn.Linear(32, 32), torch.nn.Linear(32, 16))
|
||||
@ -58,10 +64,14 @@ def maintain_proper_deepspeed_config(expected_version):
|
||||
)
|
||||
|
||||
|
||||
def can_convert_ao_model():
|
||||
def can_convert_ao_model(from_config=False):
|
||||
from transformers import AutoModelForSequenceClassification
|
||||
|
||||
accelerator_kwargs = {"mixed_precision": "fp8", "kwargs_handlers": [AORecipeKwargs()]}
|
||||
if not from_config:
|
||||
accelerator_kwargs = {"mixed_precision": "fp8", "kwargs_handlers": [AORecipeKwargs()]}
|
||||
else:
|
||||
accelerator_kwargs = {}
|
||||
|
||||
accelerator = Accelerator(**accelerator_kwargs)
|
||||
dataloader = torch.utils.data.DataLoader(torch.randn(10, 32), batch_size=2)
|
||||
model = AutoModelForSequenceClassification.from_pretrained("bert-base-cased")
|
||||
@ -78,13 +88,31 @@ def can_convert_ao_model():
|
||||
class TestTransformerEngine(unittest.TestCase):
|
||||
def test_can_prepare_model_single_gpu(self):
|
||||
command = get_launch_command(num_processes=1, monitor_interval=0.1)
|
||||
command += ["-m", "tests.test_fp8"]
|
||||
command += ["-m", "tests.test_fp8", "--test_te"]
|
||||
run_command(command)
|
||||
|
||||
def test_can_prepare_model_single_gpu_from_config(self):
|
||||
with tempfile.TemporaryDirectory() as dir_name:
|
||||
config_file = Path(dir_name) / "config.yaml"
|
||||
config_file.write_text(
|
||||
textwrap.dedent(
|
||||
"""
|
||||
distributed_type: "NO"
|
||||
num_processes: 1
|
||||
mixed_precision: fp8
|
||||
fp8_config:
|
||||
backend: TE
|
||||
"""
|
||||
)
|
||||
)
|
||||
command = get_launch_command(config_file=str(config_file), monitor_interval=0.1)
|
||||
command += ["-m", "tests.test_fp8", "--test_te", "--from_config"]
|
||||
run_command(command)
|
||||
|
||||
@require_multi_device
|
||||
def test_can_prepare_model_multi_gpu(self):
|
||||
command = get_launch_command(num_processes=2, monitor_interval=0.1)
|
||||
command += ["-m", "tests.test_fp8"]
|
||||
command += ["-m", "tests.test_fp8", "--test_te"]
|
||||
run_command(command)
|
||||
|
||||
@require_deepspeed
|
||||
@ -116,7 +144,7 @@ class TestTransformerEngine(unittest.TestCase):
|
||||
command = get_launch_command(
|
||||
num_processes=2, monitor_interval=0.1, use_deepspeed=True, deepspeed_config_file=ds_config
|
||||
)
|
||||
command += ["-m", "tests.test_fp8"]
|
||||
command += ["-m", "tests.test_fp8", "--test_te"]
|
||||
run_command(command)
|
||||
|
||||
|
||||
@ -125,13 +153,31 @@ class TestTransformerEngine(unittest.TestCase):
|
||||
class TestTorchAO(unittest.TestCase):
|
||||
def test_can_prepare_model_single_accelerator(self):
|
||||
command = get_launch_command(num_processes=1, monitor_interval=0.1)
|
||||
command += ["-m", "tests.test_fp8"]
|
||||
command += ["-m", "tests.test_fp8", "--test_ao"]
|
||||
run_command(command)
|
||||
|
||||
def test_can_prepare_model_single_gpu_from_config(self):
|
||||
with tempfile.TemporaryDirectory() as dir_name:
|
||||
config_file = Path(dir_name) / "config.yaml"
|
||||
config_file.write_text(
|
||||
textwrap.dedent(
|
||||
"""
|
||||
distributed_type: "NO"
|
||||
num_processes: 1
|
||||
mixed_precision: fp8
|
||||
fp8_config:
|
||||
backend: AO
|
||||
"""
|
||||
)
|
||||
)
|
||||
command = get_launch_command(config_file=str(config_file), monitor_interval=0.1)
|
||||
command += ["-m", "tests.test_fp8", "--test_ao", "--from_config"]
|
||||
run_command(command)
|
||||
|
||||
@require_multi_device
|
||||
def test_can_prepare_model_multi_accelerator(self):
|
||||
command = get_launch_command(num_processes=2, monitor_interval=0.1)
|
||||
command += ["-m", "tests.test_fp8"]
|
||||
command += ["-m", "tests.test_fp8", "--test_ao"]
|
||||
run_command(command)
|
||||
|
||||
@require_deepspeed
|
||||
@ -163,16 +209,26 @@ class TestTorchAO(unittest.TestCase):
|
||||
command = get_launch_command(
|
||||
num_processes=2, monitor_interval=0.1, use_deepspeed=True, deepspeed_config_file=ds_config
|
||||
)
|
||||
command += ["-m", "tests.test_fp8"]
|
||||
command += ["-m", "tests.test_fp8", "--test_ao"]
|
||||
run_command(command)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# TE suite
|
||||
if is_transformer_engine_available():
|
||||
can_convert_te_model()
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--test_te", action="store_true", default=False)
|
||||
parser.add_argument("--test_ao", action="store_true", default=False)
|
||||
parser.add_argument("--from_config", action="store_true", default=False)
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.test_te and not args.test_ao:
|
||||
raise ValueError("Must specify at least one of --test_te or --test_ao")
|
||||
|
||||
if args.test_te:
|
||||
can_convert_te_model(args.from_config)
|
||||
if os.environ.get("ACCELERATE_USE_DEEPSPEED", "false") == "true":
|
||||
maintain_proper_deepspeed_config(int(os.environ.get("ZERO_STAGE")))
|
||||
|
||||
# AO suite
|
||||
if is_torchao_available():
|
||||
can_convert_ao_model()
|
||||
if args.test_ao:
|
||||
can_convert_ao_model(args.from_config)
|
||||
|
||||
@ -61,7 +61,31 @@ class MemoryTest(unittest.TestCase):
|
||||
raise_fake_out_of_memory()
|
||||
|
||||
mock_training_loop_function()
|
||||
assert batch_sizes == [128, 64, 32, 16, 8]
|
||||
assert batch_sizes == [
|
||||
128,
|
||||
115,
|
||||
103,
|
||||
92,
|
||||
82,
|
||||
73,
|
||||
65,
|
||||
58,
|
||||
52,
|
||||
46,
|
||||
41,
|
||||
36,
|
||||
32,
|
||||
28,
|
||||
25,
|
||||
22,
|
||||
19,
|
||||
17,
|
||||
15,
|
||||
13,
|
||||
11,
|
||||
9,
|
||||
8,
|
||||
]
|
||||
|
||||
def test_memory_explicit(self):
|
||||
batch_sizes = []
|
||||
@ -75,7 +99,31 @@ class MemoryTest(unittest.TestCase):
|
||||
return batch_size, arg1
|
||||
|
||||
bs, arg1 = mock_training_loop_function("hello")
|
||||
assert batch_sizes == [128, 64, 32, 16, 8]
|
||||
assert batch_sizes == [
|
||||
128,
|
||||
115,
|
||||
103,
|
||||
92,
|
||||
82,
|
||||
73,
|
||||
65,
|
||||
58,
|
||||
52,
|
||||
46,
|
||||
41,
|
||||
36,
|
||||
32,
|
||||
28,
|
||||
25,
|
||||
22,
|
||||
19,
|
||||
17,
|
||||
15,
|
||||
13,
|
||||
11,
|
||||
9,
|
||||
8,
|
||||
]
|
||||
assert [bs, arg1] == [8, "hello"]
|
||||
|
||||
def test_start_zero(self):
|
||||
|
||||
@ -349,6 +349,26 @@ class ModelingUtilsTester(unittest.TestCase):
|
||||
|
||||
check_device_map(model, {"linear1": 0, "linear2": 1, "batchnorm": 1})
|
||||
|
||||
def test_check_device_map_invalid_keys(self):
|
||||
model = ModelForTest()
|
||||
|
||||
device_map = {
|
||||
"linear1": "cpu", # Valid module
|
||||
"batchnorm": "cpu", # Valid module
|
||||
"linear2": "cpu", # Valid module
|
||||
"invalid_module": 0, # Invalid - should trigger warning
|
||||
"another_invalid": 1, # Invalid - should trigger warning
|
||||
}
|
||||
|
||||
# Test for the warning about invalid keys
|
||||
with self.assertWarns(UserWarning) as cm:
|
||||
check_device_map(model, device_map)
|
||||
|
||||
warning_msg = str(cm.warning)
|
||||
self.assertIn("device_map keys do not match any submodules", warning_msg)
|
||||
self.assertIn("invalid_module", warning_msg)
|
||||
self.assertIn("another_invalid", warning_msg)
|
||||
|
||||
def shard_test_model(self, model, tmp_dir):
|
||||
module_index = {
|
||||
"linear1": "checkpoint_part1.bin",
|
||||
|
||||
@ -28,7 +28,6 @@ from accelerate.test_utils import (
|
||||
path_in_accelerate_package,
|
||||
require_huggingface_suite,
|
||||
require_multi_device,
|
||||
require_non_hpu,
|
||||
require_non_torch_xla,
|
||||
require_pippy,
|
||||
require_torchvision,
|
||||
@ -70,7 +69,6 @@ class MultiDeviceTester(unittest.TestCase):
|
||||
execute_subprocess_async(cmd)
|
||||
|
||||
@run_first
|
||||
@require_non_hpu # Synapse detected a device critical error that requires a restart
|
||||
@require_multi_device
|
||||
def test_multi_device_merge_fsdp_weights(self):
|
||||
print(f"Found {device_count} {torch_device} devices.")
|
||||
|
||||
@ -16,6 +16,7 @@ import csv
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import subprocess
|
||||
import tempfile
|
||||
@ -42,7 +43,9 @@ from accelerate.test_utils.testing import (
|
||||
require_matplotlib,
|
||||
require_mlflow,
|
||||
require_pandas,
|
||||
require_swanlab,
|
||||
require_tensorboard,
|
||||
require_trackio,
|
||||
require_wandb,
|
||||
skip,
|
||||
)
|
||||
@ -53,7 +56,9 @@ from accelerate.tracking import (
|
||||
DVCLiveTracker,
|
||||
GeneralTracker,
|
||||
MLflowTracker,
|
||||
SwanLabTracker,
|
||||
TensorBoardTracker,
|
||||
TrackioTracker,
|
||||
WandBTracker,
|
||||
)
|
||||
from accelerate.utils import (
|
||||
@ -520,6 +525,123 @@ class ClearMLTest(TempDirTestCase, MockingTestCase):
|
||||
self.assertCountEqual(plot["data"][0]["cells"]["values"], [[1, 2], [3, 4], [5, 6]])
|
||||
|
||||
|
||||
@require_swanlab
|
||||
@mock.patch.dict(os.environ, {"SWANLAB_MODE": "offline"})
|
||||
class SwanLabTrackingTest(TempDirTestCase, MockingTestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
# Setting Path where SwanLab parsed log files are saved via the SWANLAB_LOG_DIR env var
|
||||
self.add_mocks(mock.patch.dict(os.environ, {"SWANLAB_LOG_DIR": self.tmpdir}))
|
||||
|
||||
@skip
|
||||
def test_swanlab(self):
|
||||
# Disable hardware monitoring to prevent errors in test mode.
|
||||
import swanlab
|
||||
from swanlab.log.backup import BackupHandler
|
||||
from swanlab.log.backup.datastore import DataStore
|
||||
from swanlab.log.backup.models import ModelsParser
|
||||
|
||||
swanlab.merge_settings(swanlab.Settings(hardware_monitor=False))
|
||||
# Start a fake training session.
|
||||
accelerator = Accelerator(log_with="swanlab")
|
||||
project_name = "test_project_with_config"
|
||||
experiment_name = "test"
|
||||
description = "test project for swanlab"
|
||||
tags = ["my_tag"]
|
||||
config = {
|
||||
"epochs": 10,
|
||||
"learning_rate": 0.01,
|
||||
"offset": 0.1,
|
||||
}
|
||||
kwargs = {
|
||||
"swanlab": {
|
||||
"experiment_name": experiment_name,
|
||||
"description": description,
|
||||
"tags": tags,
|
||||
}
|
||||
}
|
||||
accelerator.init_trackers(project_name, config, kwargs)
|
||||
record_metrics = []
|
||||
record_scalars = []
|
||||
record_images_count = 0
|
||||
record_logs = []
|
||||
for epoch in range(1, swanlab.config.epochs):
|
||||
acc = 1 - 2**-epoch - random.random() / epoch - 0.1
|
||||
loss = 2**-epoch + random.random() / epoch + 0.1
|
||||
ll = swanlab.log(
|
||||
{
|
||||
"accuracy": acc,
|
||||
"loss": loss,
|
||||
"image": swanlab.Image(np.random.random((3, 3, 3))),
|
||||
},
|
||||
step=epoch,
|
||||
)
|
||||
log = f"epoch={epoch}, accuracy={acc}, loss={loss}"
|
||||
print(log)
|
||||
record_scalars.extend([acc, loss])
|
||||
record_images_count += 1
|
||||
record_logs.append(log)
|
||||
record_metrics.extend([x for _, x in ll.items()])
|
||||
accelerator.end_training()
|
||||
|
||||
# Load latest offline log
|
||||
run_dir = swanlab.get_run().public.run_dir
|
||||
assert os.path.exists(run_dir) is True
|
||||
ds = DataStore()
|
||||
ds.open_for_scan(os.path.join(run_dir.__str__(), BackupHandler.BACKUP_FILE).__str__())
|
||||
with ModelsParser() as models_parser:
|
||||
for record in ds:
|
||||
if record is None:
|
||||
continue
|
||||
models_parser.parse_record(record)
|
||||
header, project, experiment, logs, runtime, columns, scalars, medias, footer = models_parser.get_parsed()
|
||||
|
||||
# test file header
|
||||
assert header.backup_type == "DEFAULT"
|
||||
|
||||
# test project info
|
||||
assert project.name == project_name
|
||||
assert project.workspace is None
|
||||
assert project.public is None
|
||||
|
||||
# test experiment info
|
||||
assert experiment.name is not None
|
||||
assert experiment.description == description
|
||||
assert experiment.tags == tags
|
||||
|
||||
# test log record
|
||||
backup_logs = [log.message for log in logs]
|
||||
for record_log in record_logs:
|
||||
assert record_log in backup_logs, "Log not found in backup logs: " + record_log
|
||||
|
||||
# test runtime info
|
||||
runtime_info = runtime.to_file_model(os.path.join(run_dir.__str__(), "files"))
|
||||
assert runtime_info.conda is None, "Not using conda, should be None"
|
||||
assert isinstance(runtime_info.requirements, str), "Requirements should be a string"
|
||||
assert isinstance(runtime_info.metadata, dict), "Metadata should be a dictionary"
|
||||
assert isinstance(runtime_info.config, dict), "Config should be a dictionary"
|
||||
for key in runtime_info.config:
|
||||
assert key in config, f"Config key {key} not found in original config"
|
||||
assert runtime_info.config[key]["value"] == config[key], (
|
||||
f"Config value for {key} does not match original value"
|
||||
)
|
||||
|
||||
# test scalar
|
||||
assert len(scalars) + len(medias) == len(record_metrics), "Total metrics count does not match"
|
||||
backup_scalars = [
|
||||
metric.metric["data"]
|
||||
for metric in record_metrics
|
||||
if metric.column_info.chart_type.value.column_type == "FLOAT"
|
||||
]
|
||||
assert len(backup_scalars) == len(scalars), "Total scalars count does not match"
|
||||
for scalar in backup_scalars:
|
||||
assert scalar in record_scalars, f"Scalar {scalar} not found in original scalars"
|
||||
backup_images = [
|
||||
metric for metric in record_metrics if metric.column_info.chart_type.value.column_type == "IMAGE"
|
||||
]
|
||||
assert len(backup_images) == record_images_count, "Total images count does not match"
|
||||
|
||||
|
||||
class MyCustomTracker(GeneralTracker):
|
||||
"Basic tracker that writes to a csv for testing"
|
||||
|
||||
@ -681,6 +803,15 @@ class TrackerDeferredInitializationTest(unittest.TestCase):
|
||||
_ = Accelerator(log_with=tracker)
|
||||
self.assertNotEqual(PartialState._shared_state, {})
|
||||
|
||||
@require_trackio
|
||||
def test_trackio_deferred_init(self):
|
||||
"""Test that trackio tracker initialization doesn't initialize distributed"""
|
||||
PartialState._reset_state()
|
||||
tracker = TrackioTracker(run_name="test_trackio")
|
||||
self.assertEqual(PartialState._shared_state, {})
|
||||
_ = Accelerator(log_with=tracker)
|
||||
self.assertNotEqual(PartialState._shared_state, {})
|
||||
|
||||
@require_comet_ml
|
||||
def test_comet_ml_deferred_init(self):
|
||||
"""Test that CometML tracker initialization doesn't initialize distributed"""
|
||||
@ -728,3 +859,12 @@ class TrackerDeferredInitializationTest(unittest.TestCase):
|
||||
self.assertEqual(PartialState._shared_state, {})
|
||||
_ = Accelerator(log_with=tracker)
|
||||
self.assertNotEqual(PartialState._shared_state, {})
|
||||
|
||||
@require_swanlab
|
||||
def test_swanlab_deferred_init(self):
|
||||
"""Test that SwanLab tracker initialization doesn't initialize distributed"""
|
||||
PartialState._reset_state()
|
||||
tracker = SwanLabTracker(run_name="test_swanlab")
|
||||
self.assertEqual(PartialState._shared_state, {})
|
||||
_ = Accelerator(log_with=tracker)
|
||||
self.assertNotEqual(PartialState._shared_state, {})
|
||||
|
||||
Reference in New Issue
Block a user