Merge branch 'master' into zhipwang_dev

This commit is contained in:
Olatunji Ruwase
2025-08-12 06:23:28 -04:00
committed by GitHub
61 changed files with 1251 additions and 522 deletions

View File

@ -1,103 +0,0 @@
name: cpu-inference
on:
workflow_dispatch:
pull_request:
paths:
- '.github/workflows/cpu-inference.yml'
- 'requirements/**'
- 'deepspeed/__init__.py'
- 'deepspeed/inference/**'
- '!deepspeed/inference/v2/**' # exclude v2 dir
- 'tests/unit/inference/**'
- '!tests/unit/inference/v2/**' # exclude v2 tests dir
merge_group:
branches: [ master ]
schedule:
- cron: "0 0 * * 0"
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
unit-tests:
runs-on: [self-hosted, cpu]
env: {ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true} # Allow using Node16 actions
steps:
- uses: actions/checkout@v4
- id: setup-venv
uses: ./.github/workflows/setup-venv
- name: Install gcc-9
run: |
sudo add-apt-repository -u ppa:ubuntu-toolchain-r/test
sudo apt install -y gcc-9 g++-9
# set gcc-9 and g++9 to default
sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-9 99
sudo update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-9 99
- name: Check gcc version
run: |
# Get gcc version
gcc --version
g++ --version
- name: Detect instruction sets on instance
run: |
lscpu
- name: Install numactl
run: |
sudo apt-get install -y numactl
- name: Install dependencies
run: |
pip install torch
# check installed version
pip list |grep \\\<torch\\\>
- name: Install oneCCL
run: |
pip install cmake
git clone https://github.com/oneapi-src/oneCCL
cd oneCCL
mkdir build
cd build
cmake ..
make -j install
- name: Install transformers
run: |
git clone https://github.com/huggingface/transformers
cd transformers
git rev-parse --short HEAD
pip install .
- name: Install deepspeed
run: |
# check why the host does not have AVX2 support
pip install .[dev,1bit,autotuning,inf]
ds_report
- name: Python environment check
run: |
pip list
source oneCCL/build/_install/env/setvars.sh
export LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libstdc++.so.6
# check whether the environment is properly setup
python -c "import deepspeed;from deepspeed.accelerator import get_accelerator;print(get_accelerator().device_name());print(get_accelerator().is_available())"
- name: Unit tests
run: |
# prep oneCCL for CCLBackend comm ops building
source oneCCL/build/_install/env/setvars.sh
export LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libstdc++.so.6
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
cd tests
# LOCAL_SIZE=2 enforce CPU to report 2 devices, this helps run the test on github default runner
LOCAL_SIZE=2 COLUMNS=240 HF_HOME=~/tmp/hf_home/ TORCH_EXTENSIONS_DIR=./torch-extensions pytest -m 'seq_inference' unit/
LOCAL_SIZE=2 COLUMNS=240 HF_HOME=~/tmp/hf_home/ TORCH_EXTENSIONS_DIR=./torch-extensions pytest -m 'inference_ops' -m 'inference' unit/

View File

@ -33,7 +33,7 @@ jobs:
- name: Install pytorch
run: |
pip install torch torchvision --index-url https://download.pytorch.org/whl/cpu
pip install torch==2.7.1 torchvision==0.22.1 --index-url https://download.pytorch.org/whl/cpu
python -c "import torch; print('torch:', torch.__version__, torch)"
python -c "import torch; print('CUDA available:', torch.cuda.is_available())"
@ -59,5 +59,5 @@ jobs:
run: |
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
cd tests
HF_HOME=/tmp/hf_home/ pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="2.7"
HF_HOME=/tmp/hf_home/ pytest $PYTEST_OPTS --forked -m 'sequential' unit/ --torch_ver="2.7"
HF_HOME=/tmp/hf_home/ pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="2.7.1+cpu"
HF_HOME=/tmp/hf_home/ pytest $PYTEST_OPTS --forked -m 'sequential' unit/ --torch_ver="2.7.1+cpu"

99
.github/workflows/modal-accelerate.yml vendored Normal file
View File

@ -0,0 +1,99 @@
name: modal-accelerate
# This CI is running on modal.com's GPUs.
#
# It's set up here on github actions and then the cloned repo is sent to modal and everything
# happens on their hw - see deepspeed/modal_ci/accelerate.py for where the actual vm is loaded, updated and the tests are
# run.
#
# Both files are annotated to what's important and how one might change or update things if needed.
#
# Note that since this is a Required job we can't use `on.push.path` file filter - we are using
# collect-tests job to do the filtering for us so that the job can be skipped and satisfy the
# Required status for PRs to pass.
#
on:
workflow_dispatch:
push:
branches:
- master
pull_request:
paths-ignore:
- 'docs/**'
- 'blogs/**'
- 'deepspeed/inference/v2/**'
- 'tests/unit/inference/v2/**'
types: [draft, opened, ready_for_review, synchronize]
branches:
- master
concurrency:
group: ${{ github.workflow }}-${{ github.ref || github.run_id }}
cancel-in-progress: true
jobs:
collect-tests:
name: Collect tests to run
runs-on: ubuntu-latest
permissions:
contents: read
pull-requests: read
outputs:
deepspeed: ${{ steps.filter.outputs.deepspeed }}
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
lfs: true
- name: Filter changed files
uses: dorny/paths-filter@v2
id: filter
with:
token: ${{ secrets.GITHUB_TOKEN }}
filters: |
deepspeed:
- 'deepspeed/**'
- '.github/workflows/modal*.yml'
- 'ci/**'
- 'tests/unit/**'
- 'csrc/**'
deploy:
name: DeepSpeedAI CI
runs-on: ubuntu-latest
needs: collect-tests
env:
# these are created at https://modal.com/settings/deepspeedai/tokens
# they are then added to the repo's secrets at https://github.com/deepspeedai/deepspeed/settings/secrets/actions
MODAL_TOKEN_ID: ${{ secrets.MODAL_TOKEN_ID }}
MODAL_TOKEN_SECRET: ${{ secrets.MODAL_TOKEN_SECRET }}
# this one comes from https://huggingface.co/settings/profile of the bot user
# and it too is then updated at https://github.com/deepspeedai/deepspeed/settings/secrets/actions
HF_TOKEN: ${{ secrets.HF_TOKEN }}
if: needs.collect-tests.outputs.deepspeed == 'true'
steps:
- name: Checkout Repository
uses: actions/checkout@v4
with:
lfs: true
- name: Install Python
uses: actions/setup-python@v5
with:
python-version: "3.10"
cache: 'pip' # caching pip dependencies
- name: Install build dependencies
run: |
pip install uv # much faster than pip
uv pip install --system modal
- name: Run tests
run: |
modal run -m ci.accelerate

View File

@ -0,0 +1,99 @@
name: modal-torch-latest
# This CI is running on modal.com's GPUs.
#
# It's set up here on github actions and then the cloned repo is sent to modal and everything
# happens on their hw - see deepspeed/modal_ci/torch_latest.py for where the actual vm is loaded, updated and the tests are
# run.
#
# Both files are annotated to what's important and how one might change or update things if needed.
#
# Note that since this is a Required job we can't use `on.push.path` file filter - we are using
# collect-tests job to do the filtering for us so that the job can be skipped and satisfy the
# Required status for PRs to pass.
#
on:
workflow_dispatch:
push:
branches:
- master
pull_request:
paths-ignore:
- 'docs/**'
- 'blogs/**'
- 'deepspeed/inference/v2/**'
- 'tests/unit/inference/v2/**'
types: [draft, opened, ready_for_review, synchronize]
branches:
- master
concurrency:
group: ${{ github.workflow }}-${{ github.ref || github.run_id }}
cancel-in-progress: true
jobs:
collect-tests:
name: Collect tests to run
runs-on: ubuntu-latest
permissions:
contents: read
pull-requests: read
outputs:
deepspeed: ${{ steps.filter.outputs.deepspeed }}
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
lfs: true
- name: Filter changed files
uses: dorny/paths-filter@v2
id: filter
with:
token: ${{ secrets.GITHUB_TOKEN }}
filters: |
deepspeed:
- 'deepspeed/**'
- '.github/workflows/modal*.yml'
- 'ci/**'
- 'tests/unit/**'
- 'csrc/**'
deploy:
name: DeepSpeedAI CI
runs-on: ubuntu-latest
needs: collect-tests
env:
# these are created at https://modal.com/settings/deepspeedai/tokens
# they are then added to the repo's secrets at https://github.com/deepspeedai/deepspeed/settings/secrets/actions
MODAL_TOKEN_ID: ${{ secrets.MODAL_TOKEN_ID }}
MODAL_TOKEN_SECRET: ${{ secrets.MODAL_TOKEN_SECRET }}
# this one comes from https://huggingface.co/settings/profile of the bot user
# and it too is then updated at https://github.com/deepspeedai/deepspeed/settings/secrets/actions
HF_TOKEN: ${{ secrets.HF_TOKEN }}
if: needs.collect-tests.outputs.deepspeed == 'true'
steps:
- name: Checkout Repository
uses: actions/checkout@v4
with:
lfs: true
- name: Install Python
uses: actions/setup-python@v5
with:
python-version: "3.10"
cache: 'pip' # caching pip dependencies
- name: Install build dependencies
run: |
pip install uv # much faster than pip
uv pip install --system modal
- name: Run tests
run: |
modal run -m ci.torch_latest

View File

@ -1,65 +0,0 @@
name: nv-h100
on:
workflow_dispatch:
schedule:
- cron: "0 0 * * *"
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
permissions:
contents: read
issues: write
jobs:
unit-tests:
runs-on: [self-hosted, nvidia, h100]
container:
image: nvcr.io/nvidia/pytorch:23.03-py3
ports:
- 80
options: --gpus all --shm-size "8G"
steps:
- uses: actions/checkout@v4
- name: Check container state
run: |
nvidia-smi
python -c "import torch; print('torch:', torch.__version__, torch)"
python -c "import torch; print('CUDA available:', torch.cuda.is_available())"
- name: Install transformers
run: |
git clone https://github.com/huggingface/transformers
cd transformers
git rev-parse --short HEAD
python -m pip install .
- name: Install deepspeed
run: |
python -m pip install docutils==0.18.1 jinja2==3.0 urllib3==1.26.11 ninja
python -m pip install .[dev,1bit,autotuning]
ds_report
- name: Python environment
run: |
python -m pip list
- name: Unit tests
run: |
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
cd tests
python -m pytest $PYTEST_OPTS -n 4 unit/ --torch_ver="2.0" --cuda_ver="12"
python -m pytest $PYTEST_OPTS -m 'sequential' unit/ --torch_ver="2.0" --cuda_ver="12"
- name: Open GitHub issue if nightly CI fails
if: ${{ failure() && (github.event_name == 'schedule') }}
uses: JasonEtco/create-an-issue@v2
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
filename: .github/ISSUE_TEMPLATE/ci_failure_report.md
update_existing: true

View File

@ -1,53 +0,0 @@
name: nv-human-eval
on:
workflow_dispatch:
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
unit-tests:
runs-on: [self-hosted, nvidia, a6000]
container:
image: nvcr.io/nvidia/pytorch:24.12-py3
ports:
- 80
options: --gpus all --shm-size "8G"
steps:
- uses: actions/checkout@v4
- name: Check container state
run: |
ldd --version
nvcc --version
nvidia-smi
python -c "import torch; print('torch:', torch.__version__, torch)"
python -c "import torch; print('CUDA available:', torch.cuda.is_available())"
- name: Install transformers
run: |
git clone --depth=1 https://github.com/huggingface/transformers
cd transformers
git rev-parse --short HEAD
python -m pip install .
- name: Clone Human Eval
run: |
git clone --depth=1 https://github.com/openai/human-eval.git
sed -i '/exec(check_program, exec_globals)/ s/^# //' human-eval/human_eval/execution.py
cd human-eval
git rev-parse --short HEAD
python -m pip install .
- name: Install deepspeed
run: |
python -m pip install .[dev,1bit,autotuning]
ds_report
- name: Python environment
run: |
python -m pip list
- name: Unit tests
run: |
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
cd tests
python -m pytest --color=yes --durations=0 --verbose -rF -m 'evaluation' -k "test_human_eval" unit/ --torch_ver="2.6" --cuda_ver="12"

View File

@ -6,6 +6,7 @@
[![Twitter](https://img.shields.io/twitter/follow/DeepSpeedAI)](https://twitter.com/intent/follow?screen_name=DeepSpeedAI)
[![Japanese Twitter](https://img.shields.io/badge/%E6%97%A5%E6%9C%AC%E8%AA%9ETwitter-%40DeepSpeedAI_JP-blue)](https://twitter.com/DeepSpeedAI_JP)
[![Chinese Zhihu](https://img.shields.io/badge/%E7%9F%A5%E4%B9%8E-%E5%BE%AE%E8%BD%AFDeepSpeed-blue)](https://www.zhihu.com/people/deepspeed)
[![Slack](https://img.shields.io/badge/Slack-4A154B?style=for-the-badge&logo=slack&logoColor=white)](https://join.slack.com/t/deepspeedworkspace/shared_invite/zt-3a8pjd8dd-PCj2hMvR4Y2syPwVnjEoww)
<div align="center">
@ -126,9 +127,9 @@ DeepSpeed has been integrated with several different popular open-source DL fram
| Description | Status |
| ----------- | ------ |
| NVIDIA | [![nv-torch110-p40](https://github.com/deepspeedai/DeepSpeed/actions/workflows/nv-torch110-p40.yml/badge.svg?branch=master)](https://github.com/deepspeedai/DeepSpeed/actions/workflows/nv-torch110-p40.yml) [![nv-torch110-v100](https://github.com/deepspeedai/DeepSpeed/actions/workflows/nv-torch110-v100.yml/badge.svg?branch=master)](https://github.com/deepspeedai/DeepSpeed/actions/workflows/nv-torch110-v100.yml) [![nv-torch-latest-v100](https://github.com/deepspeedai/DeepSpeed/actions/workflows/nv-torch-latest-v100.yml/badge.svg?branch=master)](https://github.com/deepspeedai/DeepSpeed/actions/workflows/nv-torch-latest-v100.yml) [![nv-h100](https://github.com/deepspeedai/DeepSpeed/actions/workflows/nv-h100.yml/badge.svg?branch=master)](https://github.com/deepspeedai/DeepSpeed/actions/workflows/nv-h100.yml) [![nv-inference](https://github.com/deepspeedai/DeepSpeed/actions/workflows/nv-inference.yml/badge.svg?branch=master)](https://github.com/deepspeedai/DeepSpeed/actions/workflows/nv-inference.yml) [![nv-nightly](https://github.com/deepspeedai/DeepSpeed/actions/workflows/nv-nightly.yml/badge.svg?branch=master)](https://github.com/deepspeedai/DeepSpeed/actions/workflows/nv-nightly.yml) |
| NVIDIA | [![nv-torch-latest-v100](https://github.com/deepspeedai/DeepSpeed/actions/workflows/nv-torch-latest-v100.yml/badge.svg?branch=master)](https://github.com/deepspeedai/DeepSpeed/actions/workflows/nv-torch-latest-v100.yml) [![nv-inference](https://github.com/deepspeedai/DeepSpeed/actions/workflows/nv-inference.yml/badge.svg?branch=master)](https://github.com/deepspeedai/DeepSpeed/actions/workflows/nv-inference.yml) [![nv-nightly](https://github.com/deepspeedai/DeepSpeed/actions/workflows/nv-nightly.yml/badge.svg?branch=master)](https://github.com/deepspeedai/DeepSpeed/actions/workflows/nv-nightly.yml) |
| AMD | [![amd-mi200](https://github.com/deepspeedai/DeepSpeed/actions/workflows/amd-mi200.yml/badge.svg?branch=master)](https://github.com/deepspeedai/DeepSpeed/actions/workflows/amd-mi200.yml) |
| CPU | [![torch-latest-cpu](https://github.com/deepspeedai/DeepSpeed/actions/workflows/cpu-torch-latest.yml/badge.svg?branch=master)](https://github.com/deepspeedai/DeepSpeed/actions/workflows/cpu-torch-latest.yml) [![cpu-inference](https://github.com/deepspeedai/DeepSpeed/actions/workflows/cpu-inference.yml/badge.svg?branch=master)](https://github.com/deepspeedai/DeepSpeed/actions/workflows/cpu-inference.yml) |
| CPU | [![torch-latest-cpu](https://github.com/deepspeedai/DeepSpeed/actions/workflows/cpu-torch-latest.yml/badge.svg?branch=master)](https://github.com/deepspeedai/DeepSpeed/actions/workflows/cpu-torch-latest.yml) |
| Intel Gaudi | [![hpu-gaudi2](https://github.com/deepspeedai/DeepSpeed/actions/workflows/hpu-gaudi2.yml/badge.svg?branch=master)](https://github.com/deepspeedai/DeepSpeed/actions/workflows/hpu-gaudi2.yml) |
| Intel XPU | [![xpu-max1100](https://github.com/deepspeedai/DeepSpeed/actions/workflows/xpu-max1100.yml/badge.svg?branch=master)](https://github.com/deepspeedai/DeepSpeed/actions/workflows/xpu-max1100.yml) |
| PyTorch Nightly | [![nv-torch-nightly-v100](https://github.com/deepspeedai/DeepSpeed/actions/workflows/nv-torch-nightly-v100.yml/badge.svg?branch=master)](https://github.com/deepspeedai/DeepSpeed/actions/workflows/nv-torch-nightly-v100.yml) |

View File

@ -64,7 +64,7 @@ class CUDA_Accelerator(DeepSpeedAccelerator):
return torch.cuda.nccl.version()
def device(self, device_index=None):
return torch.cuda.device(device_index)
return torch.device('cuda', device_index)
def set_device(self, device_index):
torch.cuda.set_device(device_index)

View File

@ -67,7 +67,7 @@ def get_accelerator():
f"XPU_Accelerator requires intel_extension_for_pytorch, which is not installed on this system.")
elif accelerator_name == "xpu.external":
try:
import intel_extension_for_deepspeed # noqa: F401 # type: ignore
from intel_extension_for_deepspeed import XPU_Accelerator # noqa: F401 # type: ignore
except ImportError as e:
raise ValueError(
f"XPU_Accelerator external requires intel_extension_for_deepspeed, which is not installed on this system."
@ -224,6 +224,12 @@ def get_accelerator():
ds_accelerator = CPU_Accelerator()
elif accelerator_name == "xpu.external":
# XPU_Accelerator is already imported in detection stage
try:
from intel_extension_for_deepspeed import XPU_Accelerator # noqa: F811
except ImportError as e:
raise ValueError(
f"XPU_Accelerator external requires intel_extension_for_deepspeed, which is not installed on this system."
)
ds_accelerator = XPU_Accelerator()
elif accelerator_name == "xpu":
from .xpu_accelerator import XPU_Accelerator
@ -258,7 +264,7 @@ def get_accelerator():
def set_accelerator(accel_obj):
global ds_accelerator
_validate_accelerator(accel_obj)
if accel_logger is not None:
if accel_logger is not None and accel_obj is not None:
accel_logger.info(f"Setting ds_accelerator to {accel_obj._name} (model specified)")
ds_accelerator = accel_obj

View File

@ -0,0 +1,183 @@
<p align="center">
<img height="250" src="./images/zenflow-logo.png" alt="zenflow logo"/>
</p>
<div align="center">
# ZenFlow: Stall-Free Offloading Engine for LLM Training
<div align="center">
<img src="./images/zenflow-overview.png" alt="" width="1200" />
<div align="left">
*Figure 1: ZenFlow is DeepSpeeds stall-free offloading engine for LLM training. It decouples GPU and CPU updates by prioritizing important gradients for immediate GPU updates and deferring the rest for asynchronous CPU-side accumulation. By fully overlapping CPU work and PCIe transfers with GPU computation, ZenFlow eliminates stalls and achieves high hardware utilization across both single-GPU and multi-GPUs settings.*
## Table of Content
- [ZenFlow: Stall-Free Offloading Engine for LLM Training](#zenflow-stall-free-offloading-engine-for-llm-training)
- [Table of Content](#table-of-content)
- [Introduction](#introduction)
- [ZenFlow at a Glance](#zenflow-at-a-glance)
- [ZenFlow Highlights](#zenflow-highlights)
- [Design Motivation](#design-motivation)
- [ZenFlow Design](#zenflow-design)
- [Getting Started: Try out DeepSpeed-ZenFlow](#getting-started-try-out-deepspeed-zenflow)
- [Citation](#citation)
- [Acknowledgements](#acknowledgements)
---
## Introduction
<div align="center">
<img src="./images/zero-offload-stall.png" alt="" width="600" />
<div align="left">
*Figure 2: ZeRO-Offload causes repeated GPU stalls due to blocking CPU updates and PCIe transfers, leading to >60% idle time per step when training Llama 2-7B on 4× A100s.*
Offloading has become a standard approach to scale fine-tuning of large language models (LLMs) beyond GPU memory limits. Frameworks like ZeRO-Offload reduce GPU memory usage by pushing gradients and optimizer states to the CPU. However, they also create a new bottleneck: expensive GPUs often sit idle, waiting on slow CPU updates and PCIe data transfers. In practice, enabling offloading when training Llama 2-7B on 4× A100 GPUs can inflate each step from 0.5s to over 7s—a 14× slowdown.
<div align="center">
<img src="./images/zenflow-example.png" alt="" width="1200" />
<div align="left">
*Figure 3: In ZeRO-Offload, CPU-side optimizer updates and PCIe transfers dominate iteration time, leaving the GPU idle for over 5 seconds.*
**ZenFlow** addresses this bottleneck with a stall-free training pipeline. It prioritizes high-impact gradients for immediate GPU updates, while offloading the rest to the CPU and applying them asynchronously. These deferred CPU updates are fully overlapped with GPU compute, eliminating stalls and significantly improving throughput. Best of all, ZenFlow maintains the same model accuracy and integrates seamlessly with DeepSpeed.
---
## ZenFlow at a Glance
- **Zero GPU stalls:** Top-k important gradients are updated immediately on GPU; low-priority gradients are asynchronously processed on CPU—no GPU wait time.
- **Asynchronous and bounded:** ZenFlow decouples CPU and GPU execution with a bounded-staleness strategy that preserves convergence.
- **Auto-tuned:** ZenFlow adapts update intervals at runtime based on gradient dynamics—no need to tune manually.
---
## ZenFlow Highlights
ZenFlow is the **first offloading framework** to offer a **bounded-asynchronous** update scheme that preserves convergence while delivering **up to 5× end-to-end speed-up** over ZeRO-Offload.
### Performance
| Feature | Benefit |
|--------|---------|
| Up to **5×** end-to-end speed-up over ZeRO-Offload and **6.3×** over ZeRO-Infinity | Faster time-to-convergence |
| **> 85% reduction in GPU stalls** on A100 / H100 nodes | Keeps GPUs busy, higher utilization |
| **≈ 2× lower PCIe traffic** (1.13× model size per step vs. 2× in ZeRO) | Less bandwidth pressure on clusters |
| **Maintains or improves accuracy** on GLUE (OPT-350M → Llama-13B) | No accuracy loss |
| **Lightweight gradient selection** (6000× cheaper than full AllGather) | Scales to multi-GPU settings without memory footprint spikes |
| **Auto-tuning (Zen-auto)** automatically adapts update interval on-the-fly | No manual knob tuning |
For more detailed performance results, please refer to our [arXiv paper](https://arxiv.org/abs/2505.12242).
---
## Design Motivation
Training large models with offloading can save GPU memory, but often at the cost of *performance*. In this section, we briefly discuss three topics. **First**, we explain why coupling CPU-side optimizer updates with GPU compute leads to severe GPU stalls during LLM fine-tuning. **Next**, we quantify how full-gradient offloading saturates the limited PCIe bandwidth on A100/H100 servers, inflating iteration time. **Finally**, we reveal the highly skewed importance distribution of gradients, showing that uniformly updating all parameters in GPUs at the same time is wasteful and unnecessary.
### Offloading-Induced GPU Stalls
<div align="center">
<img src="./images/zenflow-no-overlap.png" alt="" width="1200" />
<div align="left">
*Figure 4: CPU updates dominate step time, causing >60% GPU idle due to poor overlap with compute.*
Synchronous offloading frameworks (e.g., ZeRO-Offload) keep the GPU idle while the CPU performs a full optimizer step and transfers updated parameters back to GPU. For Llama-2-7B with 4× A100, the CPU path can take **longer than 4s** while the backward pass takes **approximately 2s**, so **over 60% of each iteration is pure GPU wait time**. Eliminating this serialization is essential for achieving high GPU utilization.
### Bandwidth Bottlenecks
A single training step moves a full copy of the model gradients from GPU to CPU and a full copy of the model parameters back, i.e., **2× model size of PCIe traffic per step**. Even on PCIe 4.0 (≈ 32 GB/s), Llama-2-13B pushes ~40 GB per iteration, adding **> 1s** of transfer latency.
### Unequal Gradient Importance
Not all gradients matter equally. Our analysis shows that **the top 1% of gradient channels contribute over 90% of the ℓ²-norm energy** during fine-tuning. In other words, most updates have little impact on model learning, yet still incur disproportionately high compute and I/O costs in traditional offloading pipelines.
This skew in gradient importance opens the door to a better design: update critical gradients on GPU right away, and defer the rest for asynchronously batched, lower-priority updates on CPU. ZenFlow turns this idea into a principled, efficient training engine.
<div align="center">
<img src="./images/zenflow-gradients.png" alt="" width="1200" />
<div align="left">
*Figure 5: Top 1% of gradients may contribute over 85% of gradient norms.*
---
## ZenFlow Design
ZenFlow is designed around three key ideas that separate critical and non-critical gradient updates while minimizing communication bottlenecks. Here's how we break the tight coupling between GPU and CPU computation to create a **stall-free** pipeline.
### Idea 1: Importance-Aware Top-k Gradient Update
Not all gradients are equally impactful for training. ZenFlow introduces an **importance-aware** design that prioritizes updates for the top-k most significant gradients. These gradients are updated directly on the GPU, using its high compute bandwidth. This approach allows us to **reduce the size of the per-step gradient update** by nearly **50%**, cutting down the communication load by around 2×.
For the rest of the gradients, which contribute less to the model's learning, ZenFlow batches them and performs asynchronous updates on the CPU. These updates are **deferred** until they are sufficiently accumulated, thereby reducing the impact on training speed.
### Idea 2: Bounded-Asynchronous CPU Accumulation
ZenFlows **asynchronous accumulation** allows the CPU to stay busy while the GPU performs other computations. We apply an **accumulation window** for the non-critical gradients, allowing them to accumulate over several iterations before updating. This gives ZenFlow the ability to process **multiple rounds of gradient updates** concurrently, eliminating idle time typically spent waiting for the CPU optimizer.
By carefully coordinating CPU updates with GPU execution, ZenFlow **fully hides CPU execution** behind GPU computation—ensuring that GPUs remain actively utilized, avoiding stalls, and **maximizing hardware efficiency**.
### Idea 3: Lightweight Gradient Selection
A key challenge in distributed training is **selecting important gradients** without introducing prohibitive communication and GPU memory costs. Traditional systems rely on global synchronization (via `AllGather`) to gather full gradients, which can become a major bottleneck in multi-GPU settings.
ZenFlow solves this with a **lightweight gradient proxy**: instead of transferring full gradients, ZenFlow uses a **per-column gradient norm** to approximate the importance of each gradient. By computing a compact summary of per-column gradients (e.g., squared norms), ZenFlow reduces communication volume by more than **4,000×**—with nearly no loss in accuracy.
This approach allows ZenFlow to **scale efficiently across GPUs**, without high memory or communication overhead, and it supports **dynamic gradient selection** as the model evolves.
### Putting It All Together: ZenFlows Zero-Stall Pipeline
<div align="center">
<img src="./images/zenflow-workflow.png" alt="" width="1200" />
<div align="left">
*Figure 6: ZenFlows stall-free pipeline overlaps CPU updates and transfers with multi-steps GPU compute.*
1. **Forward/Backward Pass on GPU:** ZenFlow processes the forward and backward passes on the GPU, immediately updating the **top-k gradients** on the GPU without waiting for the CPU.
2. **Gradient Transfer to CPU:** While the GPU is busy, gradients from the current iteration (or previous ones) are transferred to the CPU over a dedicated PCIe stream. This is done in parallel with GPU computation, without causing any GPU wait time.
3. **CPU Update:** Once a batch of non-critical gradients has accumulated, the CPU performs the update asynchronously. This update typically spans multiple GPU iterations, but is hidden behind GPU work, making it virtually invisible to the overall pipeline.
4. **Double Buffering:** ZenFlow uses **double buffering** to manage the newly updated gradients. When the CPU update is complete, the new parameters are transferred back to the GPU. The swap is as fast as a pointer flip—no need to reload the entire model or re-launch the kernel.
By constantly **overlapping GPU computation with CPU-side work**, ZenFlow transforms the traditional compute → wait → update cycle into a continuous, **stall-free pipeline**.
---
## Getting Started: Try out DeepSpeed-ZenFlow
To try out DeepSpeed-ZenFlow, please refer to the [ZenFlow tutorial](https://github.com/deepspeedai/DeepSpeedExamples/blob/master/training/DeepSpeed-ZenFlow/README.md) in our DeepSpeedExamples repo.
---
## Citation
```bibtex
@article{lan2025zenflow,
title = {ZenFlow: Enabling Stall-Free Offloading Training via Asynchronous Updates},
author = {Tingfeng Lan and Yusen Wu and Bin Ma and Zhaoyuan Su and Rui Yang and Tekin Bicer and Masahiro Tanaka and Olatunji Ruwase and Dong Li and Yue Cheng},
journal = {arXiv preprint arXiv:2505.12242},
year = {2025}
}
```
---
## Acknowledgements
This work is the result of a close collaboration between University of Virginia (UVA), University of California, Merced (UC Merced), Argonne National Laboratory (ANL) and DeepSpeed team.
The contributors include [Tingfeng Lan](https://antlera.github.io/), [Yusen Wu](https://joshwoo2003.github.io/), [Zhaoyuan Su](https://alexsssu.github.io/), [Rui Yang](https://ruiyang00.github.io/), and [Yue Cheng](https://tddg.github.io/) from UVA; [Bin Ma](https://www.linkedin.com/in/bin-ma-ba665b182/) and [Dong Li](https://faculty.ucmerced.edu/dong-li/) from UC Merced; [Tekin Bicer](https://www.anl.gov/profile/tekin-bicer) from ANL; [Olatunji Ruwase](https://www.linkedin.com/in/tunji-ruwase-088952/) and [Masahiro Tanaka](https://www.linkedin.com/in/masahiro-tanaka-77482926/) from the DeepSpeed team. We especially thank [Olatunji Ruwase](https://www.linkedin.com/in/tunji-ruwase-088952/) and [Masahiro Tanaka](https://www.linkedin.com/in/masahiro-tanaka-77482926/) for their early feedback and insightful discussions and also for open-source community support.

Binary file not shown.

After

Width:  |  Height:  |  Size: 513 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 907 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 105 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 337 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.0 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 329 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 206 KiB

4
ci/__init__.py Normal file
View File

@ -0,0 +1,4 @@
# Copyright (c) DeepSpeed Team.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team

43
ci/accelerate.py Normal file
View File

@ -0,0 +1,43 @@
# Copyright (c) Snowflake.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
from pathlib import Path
import modal
ROOT_PATH = Path(__file__).parents[1]
# yapf: disable
image = (modal.Image
.from_registry("pytorch/pytorch:2.6.0-cuda12.4-cudnn9-devel", add_python="3.10")
.run_commands("apt update && apt install -y libaio-dev")
.apt_install("git")
.run_commands("uv pip install --system --compile-bytecode datasets==3.6.0")
.run_commands(
"git clone https://github.com/huggingface/accelerate && \
uv pip install --system --compile-bytecode ./accelerate[testing]"
)
.pip_install_from_requirements(ROOT_PATH / "requirements/requirements.txt", gpu="any")
.pip_install_from_requirements(ROOT_PATH / "requirements/requirements-dev.txt", gpu="any")
.add_local_dir(ROOT_PATH , remote_path="/root/", copy=True)
.run_commands("pip install /root")
.add_local_dir(ROOT_PATH / "accelerator", remote_path="/root/deepspeed/accelerator")
.add_local_dir(ROOT_PATH / "csrc", remote_path="/root/deepspeed/ops/csrc")
.add_local_dir(ROOT_PATH / "op_builder", remote_path="/root/deepspeed/ops/op_builder")
)
app = modal.App("deepspeedai-accelerate-ci", image=image)
@app.function(
gpu="l40s:1",
timeout=1800,
)
def pytest():
import subprocess
subprocess.run(
"pytest /accelerate/tests/deepspeed".split(),
check=True,
cwd=ROOT_PATH / ".",
)

39
ci/torch_latest.py Normal file
View File

@ -0,0 +1,39 @@
# Copyright (c) Snowflake.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
from pathlib import Path
import modal
ROOT_PATH = Path(__file__).parents[1]
# yapf: disable
image = (modal.Image
.from_registry("pytorch/pytorch:2.6.0-cuda12.4-cudnn9-devel", add_python="3.10")
.run_commands("apt update && apt install -y libaio-dev")
.pip_install_from_requirements(ROOT_PATH / "requirements/requirements.txt", gpu="any")
.pip_install_from_requirements(ROOT_PATH / "requirements/requirements-dev.txt", gpu="any")
.add_local_dir(ROOT_PATH , remote_path="/root/", copy=True)
.run_commands("pip install /root")
.add_local_dir(ROOT_PATH / "accelerator", remote_path="/root/deepspeed/accelerator")
.add_local_dir(ROOT_PATH / "csrc", remote_path="/root/deepspeed/ops/csrc")
.add_local_dir(ROOT_PATH / "op_builder", remote_path="/root/deepspeed/ops/op_builder")
)
app = modal.App("deepspeedai-torch-latest-ci", image=image)
@app.function(
gpu="l40s:2",
timeout=1800,
)
def pytest():
import subprocess
subprocess.run(
"pytest -n 4 --verbose tests/unit/runtime/zero/test_zero.py tests/unit/runtime/half_precision/test_bf16.py --torch_ver=2.6 --cuda_ver=12.4".split(),
check=True,
cwd=ROOT_PATH / ".",
)

View File

@ -81,7 +81,7 @@ class Autotuner:
if not os.path.exists(self.results_dir):
try:
os.makedirs(self.results_dir, exist_ok=True)
logger.info(f"Created autotuning results directory: {self.exps_dir}")
logger.info(f"Created autotuning results directory: {self.results_dir}")
except:
logger.error(
f"Failed to create {self.results_dir}, please check results_dir in the autotuning config file is accessible by all the nodes in the job."

View File

@ -144,7 +144,7 @@ DEFAULT_MIN_MEM_CONFIG = {
"zero_optimization": {
"stage": 3
},
"memory_break_down": False
"memory_breakdown": False
}
DEFAULT_TUNING_SPACE_ZERO_0 = {"zero_optimization": {"stage": 0}}

View File

@ -120,11 +120,12 @@ class DeepSpeedCheckpoint(object):
self.global_state[ITERATION_KEY] = sd.get(ITERATION_KEY, 0)
self.global_state[ARGS_KEY] = sd.get(ARGS_KEY, None)
def get_zero_checkpoint_state(self, pp_index, tp_index, dp_index) -> dict:
def get_zero_checkpoint_state(self, pp_index, tp_index, dp_index, strip_tensor_paddings: bool = True) -> dict:
return self.zero_checkpoint.get_state_for_rank(pp_index=pp_index,
tp_index=tp_index,
dp_index=dp_index,
keys_to_ignore=[PARAM_SHAPES])
keys_to_ignore=[PARAM_SHAPES],
strip_tensor_paddings=strip_tensor_paddings)
def get_zero_files(self, pp_index, tp_index, dp_index) -> list:
return self.zero_checkpoint.get_files_for_rank(pp_index=pp_index, tp_index=tp_index, dp_index=dp_index)

View File

@ -77,27 +77,12 @@ class CCLBackend(TorchBackend):
return CCLHandler(self.ccl_comm_op)
def all_reduce(self, tensor, op=ReduceOp.SUM, group=None, async_op=False):
use_caching = False
if use_caching:
match_id = f"{tensor.size()}-{op}"
name = "all_reduce_caching"
if name in self.available_coll:
group = self.get_all_ranks_from_group(group)
return self.ccl_comm_op.all_reduce_caching(tensor, op, match_id, group, async_op)
else:
return self.run_collective(name=name,
tensor=tensor,
op=op,
match_id=match_id,
group=group,
async_op=async_op)
name = "all_reduce"
if name in self.available_coll:
group = self.get_all_ranks_from_group(group)
return self.ccl_comm_op.all_reduce(tensor, op, group, async_op)
else:
name = "all_reduce"
if name in self.available_coll:
group = self.get_all_ranks_from_group(group)
return self.ccl_comm_op.all_reduce(tensor, op, group, async_op)
else:
return self.run_collective(name=name, tensor=tensor, op=op, group=group, async_op=async_op)
return self.run_collective(name=name, tensor=tensor, op=op, group=group, async_op=async_op)
def inference_all_reduce(self, tensor, op=ReduceOp.SUM, group=None):
name = "inference_all_reduce"

View File

@ -5,6 +5,8 @@
import deepspeed
from deepspeed import utils
from packaging import version
import inspect
from .utils import *
from .backend import *
@ -145,11 +147,21 @@ class TorchBackend(Backend):
def init_process_group(self, backend, timeout, init_method, rank, world_size):
if not torch.distributed.is_initialized():
torch.distributed.init_process_group(backend,
timeout=timeout,
init_method=init_method,
rank=rank,
world_size=world_size)
kwargs = dict(
timeout=timeout,
init_method=init_method,
rank=rank,
world_size=world_size,
)
# 1. device_id arg was added in torch==2.3
# 2. setting device_id leads to hanging in 2.6.0<torch<2.7.1 https://github.com/pytorch/pytorch/issues/153960
if 'device_id' in inspect.signature(torch.distributed.init_process_group).parameters and not (
version.parse("2.6.0") < version.parse(torch.__version__) < version.parse("2.7.1")):
local_rank = int(os.environ.get('LOCAL_RANK', 0))
kwargs.update(device_id=get_accelerator().device(local_rank))
torch.distributed.init_process_group(backend, **kwargs)
self.using_mpi = torch.distributed.get_backend() == 'mpi'
@disable_compiler_collective

View File

@ -101,7 +101,7 @@ def reload_activation_bwd(graph: Graph, graph_id: int, graph_order: List[int], m
with graph.inserting_after(reload_node):
wait_node = graph.create_node('call_function',
torch.ops.dc.wait_reload.default, (reload_node, graph_id, val_id), {},
name=f"wait_copy_{node.name}_{val_id}")
name=f"wait_copy_{reload_node.name}_{val_id}")
# replace all uses of node with wait_node
users = {}

View File

@ -137,7 +137,7 @@ def module_replacement(model, module_name, compression_technique=None, mpu=None)
else:
new_module = None
if compression_technique is not None:
if compression_technique is not None and new_module is not None:
for k, v in compression_technique.items():
if k == SPARSE_PRUNING:
if v[SPARSE_PRUNING_ENABLED]:

View File

@ -37,12 +37,16 @@ def shard_qkv_param(param: torch.Tensor,
if n_heads_kv is not None and n_heads_q is None:
raise ValueError("n_heads_kv should not be passed without n_heads_q")
if param is None:
raise ValueError("param should not be None")
if n_heads_q is None:
# Guaranteed to be in MHA
if param.shape[0] // 3 % head_size != 0:
raise ValueError("MHA param shape is not correct")
n_heads_q = param.shape[0] // head_size // 3
mha_sharding = True
elif n_heads_kv is None:
mha_sharding = True
else:
mha_sharding = n_heads_q == n_heads_kv
@ -73,9 +77,6 @@ def shard_qkv_param(param: torch.Tensor,
else:
even_kv_sharding = n_heads_kv >= num_shards
if param is None:
return None
q_param = param[:head_size * n_heads_q]
kv_param = param[head_size * n_heads_q:]

View File

@ -122,9 +122,9 @@ class DSSequenceDescriptor(BaseSequenceDescriptor):
self._seen_tokens = 0
self._in_flight_tokens = 0
assert kv_cache_ids_shadow is not None # add check before use
self._num_allocation_groups = tuple(kv_cache_ids_shadow.shape[0]
for kv_cache_ids_shadow in kv_cache_ids_shadow)
self._num_allocation_groups = tuple(kv_cache_id.shape[0] for kv_cache_id in kv_cache_ids_shadow)
self._blocks_per_allocation_group = tuple(
torch.zeros(num_groups, dtype=torch.int32, device="cpu") for num_groups in self._num_allocation_groups)

View File

@ -101,7 +101,7 @@ class PDSHRunner(MultiNodeRunner):
f"--master_port={self.args.master_port}"
]
if self.args.venv_script is not None:
deepspeed_launch = [f"source {self.args.venv_script}"] + deepspeed_launch
deepspeed_launch = [f"source {self.args.venv_script};"] + deepspeed_launch
if self.args.no_python:
deepspeed_launch.append("--no_python")
if self.args.module:

View File

@ -133,6 +133,9 @@ class DeepSpeedTransformerInference(nn.Module):
if "hidden_states" in kwargs:
input = kwargs["hidden_states"]
if layer_past is not None and past_key_value is not None:
raise ValueError("Only one of `layer_past` or `past_key_value` can be present.")
input_mask = (input_mask if attn_mask is None else attn_mask) if attention_mask is None else attention_mask
self.allocate_workspace(input.size())
@ -143,7 +146,7 @@ class DeepSpeedTransformerInference(nn.Module):
# We set the prev key/value to None when there is a prompt
if input.shape[1] > 1:
self.layer_past = None
layer_past = layer_past if layer_past is not None else self.layer_past
_layer_past = layer_past or past_key_value or self.layer_past
head_mask = layer_head_mask if layer_head_mask is not None else head_mask
attn_mask = None
@ -162,7 +165,7 @@ class DeepSpeedTransformerInference(nn.Module):
self.attention(input,
input_mask,
head_mask,
layer_past,
_layer_past,
get_present,
encoder_hidden_states,
encoder_attention_mask,
@ -173,7 +176,7 @@ class DeepSpeedTransformerInference(nn.Module):
**kwargs)
presents = (key, value)
self.layer_past = presents if layer_past is None else None
self.layer_past = presents if layer_past is None and past_key_value is None else None
output = self.mlp(attention_output, input, inp_norm, self.attention.attn_ob)
if not self.config.pre_layer_norm:

View File

@ -34,7 +34,10 @@ class DS_LLAMAContainer(MetaTensorContainer, HybridGatedMLPContainer, HybridSpli
_config.rotate_half = True
_config.rotate_every_two = False
_config.rotary_dim = self.hidden_size // self.num_attention_heads
_config.rope_theta = self.policy.client_module.self_attn.rope_theta
if hasattr(self.policy.client_module.self_attn, 'config'):
_config.rope_theta = self.policy.client_module.self_attn.config.rope_theta
else:
_config.rope_theta = self.policy.client_module.self_attn.rope_theta
self.module = DeepSpeedGPTInference(_config, mp_group=self.mp_group)
return self.module
@ -128,9 +131,13 @@ class LLAMALayerPolicy(TransformerPolicy):
LLAMALayerPolicy._orig_layer_class = None
def get_hidden_heads(self):
if hasattr(self.client_module.self_attn, 'config'):
num_heads = self.client_module.self_attn.config.num_attention_heads
else:
num_heads = self.client_module.self_attn.num_heads
hidden_heads = (
self.client_module.self_attn.q_proj.in_features,
self.client_module.self_attn.num_heads,
num_heads,
self.client_module.input_layernorm.variance_epsilon,
self.client_module.mlp.gate_proj.out_features,
)

View File

@ -73,6 +73,8 @@ class MegatronLayerPolicy(TransformerPolicy):
attention = self.client_module.attention
else:
attention = self.client_module.self_attention
else:
return None
return attention.query_key_value.weight, \
attention.query_key_value.bias, \

View File

@ -93,8 +93,10 @@ def generic_injection(module, dtype=None, enable_cuda_graph=True):
return child
if len(policy_attn) == 5:
qkvw, attn_ow, attn_ob, hidden_size, heads = policy_attn
qw, kw, vw = torch.empty(0), torch.empty(0), torch.empty(0)
else:
qw, kw, vw, attn_ow, attn_ob, hidden_size, heads = policy_attn
qkvw = torch.empty(0)
config = transformer_inference.DeepSpeedInferenceConfig(
hidden_size=hidden_size,
@ -113,11 +115,15 @@ def generic_injection(module, dtype=None, enable_cuda_graph=True):
return data
if len(policy_attn) == 5:
assert qkvw is not None and qkvw.data is not None, "qkvw can't be None"
attn_module.attn_qkvw.data = transpose(qkvw.data)
else:
attn_module.attn_qkvw = None
assert qw is not None and qw.data is not None, "qw can't be None"
attn_module.attn_qw.data = transpose(qw.data)
assert kw is not None and kw.data is not None, "kw can't be None"
attn_module.attn_kw.data = transpose(kw.data)
assert vw is not None and vw.data is not None, "vw can't be None"
attn_module.attn_vw.data = transpose(vw.data)
attn_module.attn_qkvb = None
@ -316,21 +322,15 @@ def replace_transformer_layer(orig_layer_impl, model, checkpoint_dict, config, m
return _autotp._replace_module(module)
def replace_fn(child, _policy, layer_id=0, prefix="", state_dict=None):
training = False # todo: refactor this part to go in the config
if training:
# copy relevant state from child -> new module
new_module = replace_with_policy(child, _policy, config.triangular_masking)
# copy relevant state from child -> new module
if not is_autotp_training_mode() and config.replace_with_kernel_inject:
new_module = replace_with_policy(child,
_policy,
config.triangular_masking,
inference=True,
layer_id=layer_id)
else:
# copy relevant state from child -> new module
if not is_autotp_training_mode() and config.replace_with_kernel_inject:
new_module = replace_with_policy(child,
_policy,
config.triangular_masking,
inference=True,
layer_id=layer_id)
else:
new_module = replace_wo_policy(child, _policy, prefix=prefix, state_dict=state_dict)
new_module = replace_wo_policy(child, _policy, prefix=prefix, state_dict=state_dict)
return new_module

View File

@ -400,7 +400,7 @@ def topkgating(
me = torch.mean(gates, dim=0)
ce = torch.mean(mask.float(), dim=0)
l_aux = torch.mean(me * ce) * num_experts * num_experts / k
locations = None
if drop_tokens:
# Calculate configured capacity and remove locations outside capacity from mask
capacity = _capacity(gates, torch.tensor(capacity_factor * k), torch.tensor(min_capacity))
@ -437,6 +437,8 @@ def topkgating(
denom_s = torch.clamp(gates_s, min=torch.finfo(gates_masked.dtype).eps)
gates_masked = gates_masked / denom_s
if locations is None:
raise ValueError(f"Locations is not set: {locations}")
# dispatch_mask
locations_sc = _one_hot_to_float((locations * mask), capacity)

View File

@ -128,18 +128,18 @@ def _kernel(A, B, C, stride_za, stride_ha, stride_ma, stride_ka, stride_zb, stri
inc_b = TK * stride_kb
else:
pinc += 2
if meta['DSD']:
inc_b = tl.load(pinc)
inc_a = tl.load(pinc + 1)
inc_b = tl.multiple_of(inc_b, 8)
inc_a = tl.multiple_of(inc_a, 8)
inc_b = inc_b * stride_kb
if meta['DDS']:
inc_a = tl.load(pinc)
inc_b = tl.load(pinc + 1)
inc_a = tl.multiple_of(inc_a, 8)
inc_b = tl.multiple_of(inc_b, 8)
inc_a = inc_a * stride_ka
if meta['DSD']:
inc_b = tl.load(pinc)
inc_a = tl.load(pinc + 1)
inc_b = tl.multiple_of(inc_b, 8)
inc_a = tl.multiple_of(inc_a, 8)
inc_b = inc_b * stride_kb
if meta['DDS']:
inc_a = tl.load(pinc)
inc_b = tl.load(pinc + 1)
inc_a = tl.multiple_of(inc_a, 8)
inc_b = tl.multiple_of(inc_b, 8)
inc_a = inc_a * stride_ka
pa += inc_a
pb += inc_b
# pre-fetch

View File

@ -57,7 +57,7 @@ class DeepSpeedDiffusersTransformerBlock(nn.Module):
self.attn_2.do_out_bias = False
self.attn_2_bias = self.attn_2.attn_ob
else:
self.attn_2_bias = nn.Paramaeter(torch.zeros_like(self.norm3_g), requires_grad=False)
self.attn_2_bias = nn.Parameter(torch.zeros_like(self.norm3_g), requires_grad=False)
self.gated_activation = GatedActivationOp()
self.layer_norm = LayerNormOp()

View File

@ -335,27 +335,29 @@ class DeepSpeedTransformerLayer(nn.Module):
self.norm_b = nn.Parameter(torch.Tensor(self.config.hidden_size))
self.init_transformer_weights(self.config.adjust_init_range)
else:
# For testing only.
q = initial_weights[0].data
k = initial_weights[1].data
v = initial_weights[2].data
if initial_weights is not None:
# For testing only.
q = initial_weights[0].data
k = initial_weights[1].data
v = initial_weights[2].data
self.attn_qkvw = nn.Parameter(torch.cat((q, k, v)))
#self.attn_qkvw[i * self.config.hidden_size:(i + 1) * self.config.hidden_size] = \
# initial_weights[i].clone()
#torch.empty_like(initial_weights[i]).data.copy_(initial_weights[i].data)
self.attn_qkvb = nn.Parameter(torch.Tensor(self.config.hidden_size * 3))
self.attn_qkvb.data.zero_()
self.attn_ow = initial_weights[3]
self.attn_ob = initial_biases[3]
self.attn_nw = initial_weights[4]
self.attn_nb = initial_biases[4]
self.inter_w = initial_weights[5]
self.inter_b = initial_biases[5]
self.output_w = initial_weights[6]
self.output_b = initial_biases[6]
self.norm_w = initial_weights[7]
self.norm_b = initial_biases[7]
self.attn_qkvw = nn.Parameter(torch.cat((q, k, v)))
#self.attn_qkvw[i * self.config.hidden_size:(i + 1) * self.config.hidden_size] = \
# initial_weights[i].clone()
#torch.empty_like(initial_weights[i]).data.copy_(initial_weights[i].data)
self.attn_qkvb = nn.Parameter(torch.Tensor(self.config.hidden_size * 3))
self.attn_qkvb.data.zero_()
self.attn_ow = initial_weights[3]
self.attn_nw = initial_weights[4]
self.inter_w = initial_weights[5]
self.output_w = initial_weights[6]
self.norm_w = initial_weights[7]
if initial_biases is not None:
self.attn_ob = initial_biases[3]
self.attn_nb = initial_biases[4]
self.inter_b = initial_biases[5]
self.output_b = initial_biases[6]
self.norm_b = initial_biases[7]
# Load cuda modules if needed
global transformer_cuda_module, stochastic_transformer_cuda_module

View File

@ -715,7 +715,7 @@ class DistributedDataAnalyzer(object):
buffer = torch.cat(tensor_list, dim=0).to(self.device)
write_buffer_to_file(buffer, 0, builder)
elif self.worker_id == 0 and src > 0: # rank 0 receives other rank's data and writes it
buffer = torch.empty(sizes[src].item(), dtype=buffer.dtype, device=buffer.device)
buffer = torch.empty(sizes[src].item(), dtype=numpy_dtype, device=self.device)
err = dist.recv(buffer, src=src, group=self.comm_group, tag=src)
assert err == src and len(buffer) > 0, "recv failed"
write_buffer_to_file(buffer, src, builder)

View File

@ -407,7 +407,6 @@ class DeepSpeedEngine(Module):
for _, module in self.module.named_modules():
if isinstance(module, LoRAOptimizedLinear):
self.optimized_linear_lora_enabled = True
offload_ratio = None
if offload_ratio is not None:
assert offload_ratio == module.lora_config.offload_ratio, \
"all lora_config offload ratios should be the same across the model"
@ -730,6 +729,15 @@ class DeepSpeedEngine(Module):
raise ValueError(f'not yet support')
#self.lr_scheduler = lr_schedules.WarmupLayerTokenDecayLR(self.optimizer, self.random_ltd_scheduler)
def get_data_parallel_rank(self):
return groups.get_data_parallel_rank()
def get_tensor_parallel_rank(self):
return groups.get_tensor_model_parallel_rank()
def get_model_parallel_rank(self):
return groups.get_model_parallel_rank()
def get_sequence_parallel_group(self):
return self.seq_parallel_group
@ -1253,10 +1261,6 @@ class DeepSpeedEngine(Module):
@staticmethod
def __check_params(model: Module, dtype: torch.dtype) -> None:
return
if not all(param.dtype == dtype for param in model.parameters()) and dist.get_rank() == 0:
raise ValueError(f"{dtype} is enabled but the following parameters have dtype that is "
f"not {dtype}: "
f"{[(n, p.dtype) for n, p in model.named_parameters() if p.dtype != dtype]}")
def _set_client_model(self, model):
# register client model in _modules so that nn.module methods work correctly

View File

@ -13,7 +13,6 @@ from deepspeed import comm as dist
from deepspeed.utils import logger
from deepspeed.utils.timer import ThroughputTimer
from deepspeed.accelerator import get_accelerator
from deepspeed.runtime.bf16_optimizer import BF16_Optimizer
from ..engine import DeepSpeedEngine, MEMORY_OPT_ALLREDUCE_SIZE
@ -535,6 +534,9 @@ class PipelineEngine(DeepSpeedEngine):
"""True if this process is in the last stage in the pipeline."""
return self.stage_id == self.num_stages - 1
def get_pipeline_parallel_rank(self):
return self.stage_id
def _reduce_outputs(self, outputs, reduce='avg', reduce_dp=True, micro_batches=None):
if reduce is None:
return outputs
@ -709,7 +711,6 @@ class PipelineEngine(DeepSpeedEngine):
def _exec_forward_pass(self, buffer_id):
self.tput_timer.start()
self.mem_status('BEFORE FWD', reset_max=True)
if isinstance(self.pipe_buffers['inputs'][buffer_id], tuple):
inputs = tuple(t.clone() for t in self.pipe_buffers['inputs'][buffer_id])
@ -805,13 +806,10 @@ class PipelineEngine(DeepSpeedEngine):
assert self.optimizer is not None, "must provide optimizer during " \
"init in order to use backward"
self.mem_status('BEFORE BWD', reset_max=True)
# The last stage just runs backward on the loss using DeepSpeed's typical
# mechanisms.
if self.is_last_stage():
super().backward(self.loss)
self.mem_status('AFTER BWD')
return
outputs = self.pipe_buffers['outputs'][buffer_id]
@ -878,8 +876,6 @@ class PipelineEngine(DeepSpeedEngine):
self.timers(BACKWARD_MICRO_TIMER).stop()
self.timers(BACKWARD_GLOBAL_TIMER).stop()
self.mem_status('AFTER BWD')
def _exec_load_micro_batch(self, buffer_id):
if self.wall_clock_breakdown():
self.timers(BATCH_INPUT_TIMER).start()
@ -1218,14 +1214,11 @@ class PipelineEngine(DeepSpeedEngine):
if self.wall_clock_breakdown():
self.timers(STEP_MICRO_TIMER).start()
self.timers(STEP_GLOBAL_TIMER).start()
self.mem_status('BEFORE STEP', reset_max=True)
self._force_grad_boundary = True
self._take_model_step(lr_kwargs)
self._force_grad_boundary = False
self.mem_status('AFTER STEP')
if self.global_rank == 0 and self.monitor.enabled:
self.summary_events = [(f'Train/Samples/lr', self.get_lr()[0], self.global_samples)]
if self.fp16_enabled() and hasattr(self.optimizer, 'cur_scale'):
@ -1304,53 +1297,6 @@ class PipelineEngine(DeepSpeedEngine):
"""Disabled for pipeline parallel training. See ``train_batch()``. """
raise PipelineError("Only train_batch() is accessible in pipeline mode.")
def mem_status(self, msg, print_rank=-1, reset_max=False):
return
global mem_alloced, mem_cached
if not self.global_steps == 0 or not self.global_steps == 9:
#return
pass
if self.mpu.get_data_parallel_rank() != 0:
return
if self.global_rank != 0:
return
rank = self.global_rank
if print_rank != -1 and rank != print_rank:
return
get_accelerator().synchronize()
if reset_max:
get_accelerator().reset_max_memory_cached()
get_accelerator().reset_max_memory_allocated()
new_alloced = get_accelerator().memory_allocated()
new_cached = get_accelerator().memory_cached()
delta_alloced = new_alloced - mem_alloced
delta_cached = new_cached - mem_cached
mem_cached = new_cached
mem_alloced = new_alloced
max_alloced = get_accelerator().max_memory_allocated()
max_cached = get_accelerator().max_memory_cached()
# convert to GB for printing
new_alloced /= 1024**3
new_cached /= 1024**3
delta_alloced /= 1024**3
delta_cached /= 1024**3
max_alloced /= 1024**3
max_cached /= 1024**3
print(
f'RANK={rank} STAGE={self.stage_id} STEP={self.global_steps} MEMSTATS', msg,
f'current alloc={new_alloced:0.4f}GB (delta={delta_alloced:0.4f}GB max={max_alloced:0.4f}GB) '
f'current cache={new_cached:0.4f}GB (delta={delta_cached:0.4f}GB max={max_cached:0.4f}GB)')
def module_state_dict(self, exclude_frozen_parameters=False):
"""Override hack to save a pipe model and return the directory path of the save.

View File

@ -18,10 +18,11 @@ ALST features found in this module:
- `UlyssesSPDataLoaderAdapter` - DL adapter to shard the normal DL batches to be used by `UlyssesSPAttentionHF`
- `SequenceTiledCompute` - generic autograd function to perform compute after tiling on the sequence dimension
- `TiledMLP` - a specific autograd function to perform tiled MLP (it's much easier to understand before trying to grok `SequenceTiledCompute`)
- `TiledFusedLogitsLoss` - a specific autograd function to perform loss computation without manifesting the full logits tensor and instead computing loss on shards of logits.
This module implements Arctic Long Sequence Training: Scalable And Efficient Training For Multi-Million Token Sequences: https://arxiv.org/abs/2506.13996
For integration docs see: https://www.deepspeed.ai/tutorials/ulysses-alst-sequence-pallellism/
For integration docs see: https://www.deepspeed.ai/tutorials/ulysses-alst-sequence-parallelism/
The other ALST features live inside
https://github.com/snowflakedb/ArcticTraining/blob/main/projects/sequence-parallelism/
@ -360,14 +361,15 @@ class UlyssesSPAttentionHF(torch.nn.Module):
# we don't have the model yet at this stage
hf_model_config = AutoConfig.from_pretrained(model_name_or_path)
if core_attn_implementation not in ["flash_attention_2", "sdpa"]:
supported_attn_implementation = ["flash_attention_2", "flash_attention_3", "sdpa"]
if core_attn_implementation not in supported_attn_implementation:
# notes on the excluded ones:
# - eager: The problem is that `eager` wants an attention_mask and it creates the wrong attention mask it seems if we don't provide one - it's possible that we could somehow solve this, but it's also unlikely someone will want to use the slow eager attention with sequence parallelism
# - flex_attention: haven't tried
raise ValueError(
f"{core_attn_implementation} attn_implementation isn't currently supported by Ulysses sequence"
" parallelism. Set core_attn_implementation arg to either 'flash_attention_2' or 'sdpa'.")
f" parallelism. Set core_attn_implementation arg to one of {supported_attn_implementation}.")
if core_attn_implementation not in ALL_ATTENTION_FUNCTIONS:
raise ValueError(
@ -495,8 +497,12 @@ class UlyssesSPDataLoaderAdapter:
return self.micro_batches.pop(0)
def refill(self):
# this will raise StopIteration when empty
batch = next(self.iter)
# reset the iterator if StopIteration arrives, and re-raise it to allow multiple epochs to run
try:
batch = next(self.iter)
except StopIteration:
self.iter = iter(self.dl)
raise StopIteration
micro_batches = defaultdict(dict)
# XXX: replace with more efficient all-to-all?
@ -639,6 +645,7 @@ class SequenceTiledCompute(torch.autograd.Function):
ctx.grad_requiring_tensor_key = grad_requiring_tensor_key
ctx.compute_params = [p for p in compute_params if p.requires_grad]
ctx.output_unshard_dimension = output_unshard_dimension
ctx.output_reduction = output_reduction
with torch.no_grad():
args = list(args)
@ -685,6 +692,7 @@ class SequenceTiledCompute(torch.autograd.Function):
shards = ctx.shards
kwargs_to_shard = ctx.kwargs_to_shard
kwargs_to_pass = ctx.kwargs_to_pass
output_reduction = ctx.output_reduction
grad_requiring_tensor_key = ctx.grad_requiring_tensor_key
grad_requiring_tensor_key_index = ctx.grad_requiring_tensor_key_index
@ -698,20 +706,21 @@ class SequenceTiledCompute(torch.autograd.Function):
grad_requiring_tensor.requires_grad_(grad_requiring_tensor_requires_grad)
incoming_grad = grads[0]
grad_requiring_tensor_grad = torch.zeros_like(grad_requiring_tensor)
# since we perform a reduction of outputs that doesn't get included in `autograd.backward` below we need to pre-adjust the incoming gradient. in the case of "sum" the gradient is 1.0, in the case of "mean" it's 1.0/num_elements, which in this case is 1/shards.
if output_reduction == "mean":
incoming_grad /= shards
kwargs_to_shard_shards = {
k: list(torch.chunk(kwargs_to_shard[k], chunks=shards, dim=1))
for k in kwargs_to_shard.keys()
}
if grad_requiring_tensor.shape[0] == 1:
grad_requiring_tensor_grad = torch.zeros_like(grad_requiring_tensor)
else:
grad_requiring_tensor_grad = torch.empty_like(grad_requiring_tensor)
kwargs_to_shard_shards = {k: list(torch.chunk(v, chunks=shards, dim=1)) for k, v in kwargs_to_shard.items()}
# if seqlen is not exactly divisible by shards the last step will be shorter than shard_step
shard_step = kwargs_to_shard_shards[grad_requiring_tensor_key][0].shape[1]
for i in range(shards):
# when fn involves one or more model weights deepspeed will normally push a grad to
# reduce per sub-module call, so since we only want it to add a grad for the last
# shard's call , we signal to zero not to add new gradients to reduce until the last
# shard's call, we signal to ZeRO not to add new gradients to reduce until the last
# shard when all gradients have been accumulated. An example for such a call is
# `model.lm_head(hidden_states)`
if compute_params is not None:
@ -723,16 +732,21 @@ class SequenceTiledCompute(torch.autograd.Function):
for param in compute_params:
param.ds_grad_is_ready = True
kwargs_to_shard_shard = {k: kwargs_to_shard_shards[k].pop(0) for k in kwargs_to_shard_shards.keys()}
kwargs_to_shard_shard = {k: v[i] for k, v in kwargs_to_shard_shards.items()}
grad_requiring_tensor_shard = kwargs_to_shard_shard[grad_requiring_tensor_key]
grad_requiring_tensor_shard.requires_grad_(grad_requiring_tensor_requires_grad)
shard_offset = i * shard_step
# this will enable gradual population of the pre-allocated
# `grad_requiring_tensor_shard.grad` during `torch.autograd.backward` calls
grad_requiring_tensor_shard.grad = (grad_requiring_tensor_grad.narrow(
1, shard_offset, shard_step).view_as(grad_requiring_tensor_shard))
# if seqlen is not exactly divisible by shards the last step will be shorter than shard_step
shard_step = kwargs_to_shard_shards[grad_requiring_tensor_key][i].shape[1]
shard_offset = i * kwargs_to_shard_shards[grad_requiring_tensor_key][0].shape[1]
if grad_requiring_tensor.shape[0] == 1:
# on narrow the shard's stride is unaffected with dim0==1 (bs) so we use the most efficient `narrow` alias:
# this will enable gradual population of the pre-allocated
# `grad_requiring_tensor_shard.grad` during `torch.autograd.backward` calls
grad_requiring_tensor_shard.grad = grad_requiring_tensor_grad.narrow(
1, shard_offset, shard_step).view_as(grad_requiring_tensor_shard)
with torch.enable_grad():
output = fn(**kwargs_to_shard_shard, **kwargs_to_pass)
@ -745,6 +759,16 @@ class SequenceTiledCompute(torch.autograd.Function):
shard_step).view_as(grad_requiring_tensor_shard))
torch.autograd.backward(output, incoming_grad_shard)
if grad_requiring_tensor.shape[0] > 1:
# this is less efficient than dim0==1 (bs) use case, due to a required copy to fix
# the stride and needing a bit more memory for one shard's grad, since
# narrow(dim=1, ...) while dim0>1 will lead to:
# UserWarning: grad and param do not obey the gradient layout contract. This is not an error, but may impair performance.
# when backward is called.
grad_requiring_tensor_grad.narrow(1, shard_offset,
shard_step).view_as(grad_requiring_tensor_shard).copy_(
grad_requiring_tensor_shard.grad)
# positional args
grad_outputs = [None] * 9
# inject the grad for the position of forward input that is grad-requiring
@ -832,14 +856,18 @@ class TiledMLP(torch.autograd.Function):
# detach() unsets `x.requires_grad`, so restore it
x.requires_grad_(x_requires_grad)
incoming_grad = grads[0]
bs, seqlen, hidden_size = x.shape
# flatten bs+seqlen to avoid having stride issues when narrowing into seqlen w/ bs>1
x = x.view(-1, hidden_size)
incoming_grad = grads[0].view(-1, hidden_size)
x_grad = torch.zeros_like(x)
x_shards = list(torch.chunk(x, chunks=shards, dim=1))
shard_step = x_shards[0].shape[1]
x_shards = list(torch.chunk(x, chunks=shards, dim=0))
for i, x_shard in enumerate(x_shards):
# Tell deepspeed not to add a new grad to its ipg bucket until the last shard is run
# XXX: DDP, FSDP will need something similar to make it work
if compute_params is not None:
if i + 1 < shards:
for param in compute_params:
@ -851,16 +879,175 @@ class TiledMLP(torch.autograd.Function):
x_shard.requires_grad_(x_requires_grad)
shard_offset = i * shard_step
x_shard.grad = x_grad.narrow(1, shard_offset, shard_step).view_as(x_shard)
incoming_grad_shard = incoming_grad.narrow(1, shard_offset, shard_step).view_as(x_shard)
# if seqlen is not exactly divisible by shards the last step will be shorter than shard_step
shard_step = x_shards[i].shape[0]
shard_offset = i * x_shards[0].shape[0]
x_shard.grad = x_grad.narrow(0, shard_offset, shard_step).view_as(x_shard)
incoming_grad_shard = incoming_grad.narrow(0, shard_offset, shard_step).view_as(x_shard)
with torch.enable_grad():
output = fn(self, x_shard)
torch.autograd.backward(output, incoming_grad_shard)
# unflatten
x_grad = x_grad.view(bs, -1, hidden_size)
return (None, None, x_grad, None, None)
class TiledFusedLogitsLoss(torch.autograd.Function):
"""
Perform a tiled loss computation while not manifesting a full logits tensor to massively reduce memory usage.
Args:
- fn: the function to call on sharded inputs
- `self`: the lm_head module object, often it will be `unwrapped_model.model.lm_head`
- `x`: the input (typically `hidden_states`) - which gets sharded
- `y`: the target (typically `labels` or `shift_labels`) - which gets sharded.
- `mask`: an optional mask. It will be not passed to the `fn` if set to `None`. If not-`None` it'll be sharded with `x` and `y`
- `shards`: how many shards to use
- compute_params: a list of weights engaged in the compute Default: `None` (only needed when using DeepSpeed ZeRO)
- output_reduction: "mean" or "sum". If the unmasked elements in `x` are of different sizes in different shards, it's recommended to use "sum" instead of "mean" and perform the balanced mean to the output. This would be the case if `x` is not evenly divisible by `shards` or if the mask may lead to a different number of unmasked elements.
Returns:
- the computed `loss`
Note, that since this autograd function is typically the last one in the call stack, it performs `backward` inside `forward` and compensates for `output_reduction` artificially. This removes the need to re-run `forward` a second time inside `backward`
For a generic tiled compute implementation that can handle many other types of `forward` see `SequenceTiledCompute`.
An example:
def loss_fn(self, x, y):
logits = self.lm_head(x)
return self.cross_entropy_loss(logits.view(-1, self.vocab_size), y.view(-1))
x = hidden_states
y = shift_labels
mask = None
shards = 2
compute_params = [self.lm_head.weight]
output_reduction = "mean"
loss = TiledFusedLogitsLoss.apply(
loss_fn,
self,
x,
y,
mask,
shards,
compute_params,
output_reduction,
)
"""
@staticmethod
def forward(
ctx,
fn,
self,
x,
y,
mask,
shards,
compute_params,
output_reduction,
) -> torch.Tensor:
if output_reduction not in ["mean", "sum"]:
raise ValueError(f'unknown reduction {output_reduction}: valid values are: "mean"/"sum"')
if x.dim() < 2:
raise ValueError("x must be at least 2D [batch_size, seq_len, ...]")
if y.dim() < 2:
raise ValueError("y must be at least 2D [batch_size, seq_len, ...]")
if x.shape[:2] != y.shape[:2]:
raise ValueError("x and y batch/seq dims must match")
if mask is not None:
if mask.dim() != 2:
raise ValueError(f"mask must be 2D [batch_size, seq_len], but got {mask.dim()}")
if mask.shape != x.shape[:2]:
raise ValueError(f"mask shape must match x and y batch/seq")
compute_params = [p for p in compute_params if p.requires_grad]
x_requires_grad = x.requires_grad
x = x.detach().requires_grad_(x_requires_grad)
bs, seqlen = x.shape[:2]
# flatten bs+seqlen to avoid having stride issues when narrowing into seqlen w/ bs>1
x = x.view(-1, *x.shape[2:])
y = y.view(-1, *y.shape[2:])
if mask is not None:
mask = mask.view(-1)
incoming_grad = torch.tensor(1.0, dtype=x.dtype, device=x.device)
# we are faking the incoming gradient, and since we perform a reduction outside of `autograd.backward` below we need to pre-adjust the incoming gradient. in the case of "sum" the gradient is 1.0, in the case of "mean" it's 1.0/num_elements, which in this case is 1/shards.
if output_reduction == "mean":
incoming_grad /= shards
# XXX: deal with the use case of running in inference mode, where we don't need backward
x_grad = torch.zeros_like(x) if x_requires_grad else None
x_shards = list(torch.chunk(x, chunks=shards, dim=0))
y_shards = list(torch.chunk(y, chunks=shards, dim=0))
if mask is not None:
mask_shards = list(torch.chunk(mask, chunks=shards, dim=0))
output_shards = []
for i, (x_shard, y_shard) in enumerate(zip(x_shards, y_shards)):
# Tell deepspeed not to add a new grad to its ipg bucket until the last shard is run
# XXX: DDP, FSDP will need something similar to make it work
if compute_params is not None:
if i + 1 < shards:
for param in compute_params:
param.ds_grad_is_ready = False
else:
# last shard, can add the grad
for param in compute_params:
param.ds_grad_is_ready = True
x_shard.requires_grad_(x_requires_grad)
# if seqlen is not exactly divisible by shards the last step will be shorter than shard_step
shard_step = x_shards[i].shape[0]
shard_offset = i * x_shards[0].shape[0]
args = (self, x_shard, y_shard)
if mask is not None:
args += (mask_shards[i], )
if x_grad is not None:
x_shard.grad = x_grad.narrow(0, shard_offset, shard_step).view_as(x_shard)
with torch.enable_grad():
output = fn(*args)
output_shards.append(output)
torch.autograd.backward(output, incoming_grad)
else:
output = fn(*args)
output_shards.append(output)
output_unsharded = torch.cat([l.unsqueeze(0) for l in output_shards], dim=0)
if output_reduction == "mean":
output = output_unsharded.mean()
elif output_reduction == "sum":
output = output_unsharded.sum()
# unflatten
if x_grad is not None:
x_grad = x_grad.view(bs, seqlen, *x_grad.shape[1:])
ctx.save_for_backward(x_grad.detach())
return output
@staticmethod
def backward(ctx, *grads) -> torch.Tensor:
(x_grad, ) = ctx.saved_tensors
# grads[0] should normally be 1.0 as it should be coming from loss.backward()
if grads[0] != 1.0:
x_grad *= grads[0]
return (None, None, x_grad, None, None, None, None, None, None)
class AutogradComputeMLP(torch.autograd.Function):
"""
This is a simplified example to override the normal MLP via an autograd function - then tiling can be added - this simplified version was useful to detect a leak in Deepspeed, so let's keep it.

View File

@ -15,10 +15,10 @@ class SparseTensor(object):
def __init__(self, dense_tensor=None):
self.orig_dense_tensor = dense_tensor
self.dtype = self.orig_dense_tensor.dtype
self.is_sparse = dense_tensor.is_sparse
if dense_tensor is not None:
if dense_tensor.is_sparse:
self.is_sparse = dense_tensor.is_sparse
self.dtype = self.orig_dense_tensor.dtype
if self.is_sparse:
dense_tensor = dense_tensor.coalesce()
self.indices = dense_tensor.indices().flatten()
self.values = dense_tensor.values()

View File

@ -86,13 +86,13 @@ class LinearFunctionForZeroStage3(torch.autograd.Function):
# improve efficiency. If you want to make your code simpler, you can
# skip them. Returning gradients for inputs that don't require it is
# not an error.
dim = grad_output.dim()
if ctx.needs_input_grad[0]:
#print(f"Computing grad input weight {weight.shape} grad_output {grad_output.shape}")
grad_input = grad_output.matmul(weight)
#print(f"Computed grad input {grad_input.shape}")
if ctx.needs_input_grad[1]:
#print("Computing grad weight")
dim = grad_output.dim()
if dim > 2:
grad_weight = grad_output.reshape(-1,
grad_output.shape[-1]).t().matmul(input.reshape(-1, input.shape[-1]))

View File

@ -1390,8 +1390,8 @@ class Init(InsertPostInitMethodToModuleSubClasses):
handles = []
for dtype in sort_dtypes(dtype_params.keys()):
handles.append(
_all_gather_dtype(params, world_size, rank_in_group, ds_process_group,
allgather_dtype))
_all_gather_dtype(dtype_params[dtype], world_size, rank_in_group, ds_process_group,
dtype))
return MultipleAllGatherHandles(handles)

View File

@ -1656,16 +1656,11 @@ class DeepSpeedZeroOptimizer_Stage3(ZeROOptimizer):
tensor_to_allreduce.div_(dist.get_world_size(group=self.dp_process_group) / float(self.sequence_parallel_size))
if rank is None:
# "All Reducing"
dist.all_reduce(tensor_to_allreduce, group=self.dp_process_group)
else:
global_rank = dist.get_global_rank(self.dp_process_group, rank)
dist.reduce(tensor_to_allreduce, global_rank, group=self.dp_process_group)
# "All Reducing"
dist.all_reduce(tensor_to_allreduce, group=self.dp_process_group)
if communication_data_type != tensor.dtype and tensor is not tensor_to_allreduce:
if rank is None or rank == dist.get_rank(group=self.dp_process_group):
tensor.copy_(tensor_to_allreduce)
tensor.copy_(tensor_to_allreduce)
return tensor

View File

@ -995,6 +995,9 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
self.report_ipg_memory_usage("In ipg_remove_grads before reduce_ipg_grads", param.numel(), param.dtype)
self.reduce_ipg_grads()
if self.contiguous_gradients and self.overlap_comm:
if not get_accelerator().resolves_data_dependency():
self.reduction_stream.wait_stream(get_accelerator().current_stream())
get_accelerator().current_stream().wait_stream(self.reduction_stream)
# Swap index between 0 and 1
bucket.index = 1 - bucket.index
self.report_ipg_memory_usage("In ipg_remove_grads after reduce_ipg_grads", param.numel(), param.dtype)
@ -1346,10 +1349,9 @@ class DeepSpeedZeroOptimizer(ZeROOptimizer):
dest_tensor = self.single_partition_of_fp32_groups[i].grad.view(-1).narrow(0, dest_offset, num_elements)
grad_accum = self.get_param_gradient_attribute(param)
if grad_accum is None:
src_tensor = grad_accum.view(-1).narrow(0, source_offset, num_elements)
else:
src_tensor = grad_accum.view(-1).narrow(0, source_offset, num_elements)
assert grad_accum is not None
src_tensor = grad_accum.view(-1).narrow(0, source_offset, num_elements)
if not self.fp16_master_weights_and_gradients:
src_tensor = src_tensor.float()

View File

@ -124,7 +124,7 @@ class FPDT_InputConstruct(torch.nn.Module):
load_balanced_tokens = self.tokens[:, indices]
load_balanced_labels = self.labels[:, indices] if self.labels is not None else self.labels
load_balanced_attention_mask = self.attention_mask if self.attention_mask is not None else self.attention_mask
load_balanced_attention_mask = self.attention_mask
load_balanced_position_ids = self.position_ids[:,
indices] if self.position_ids is not None else self.position_ids

View File

@ -404,7 +404,7 @@ def _create_expert_data_and_model_parallel(expert_parallel_size_, mpu, use_data_
world_size = dist.get_world_size()
rank = dist.get_rank()
dp_world_size = mpu.get_data_parallel_world_size()
dp_world_size = _get_data_parallel_world_size()
pp_world_size = 1 if mpu is None else bwc_pipeline_parallel_world_size(mpu)
_ensure_divisibility(world_size, tensor_parallel_size_)
@ -569,31 +569,37 @@ def _get_data_parallel_group_ranks():
def _get_broadcast_src_rank():
assert dist.is_initialized(), 'dist is not initialized'
return dist.get_global_rank(_get_sequence_data_parallel_group(), 0)
def _get_expert_broadcast_src_rank(group_name):
assert dist.is_initialized(), 'dist is not initialized'
return dist.get_global_rank(_get_expert_data_parallel_group(group_name), 0)
def _get_expert_parallel_world_size(group_name):
"""Return world size for the expert parallel group."""
assert dist.is_initialized(), 'dist is not initialized'
return dist.get_world_size(group=_get_expert_parallel_group(group_name))
def _get_expert_data_parallel_world_size(group_name):
"""Return world size for the expert data parallel group."""
assert dist.is_initialized(), 'dist is not initialized'
return dist.get_world_size(group=_get_expert_data_parallel_group(group_name))
def _get_expert_parallel_rank(group_name):
"""Return my rank for the expert parallel group."""
assert dist.is_initialized(), 'dist is not initialized'
return dist.get_rank(group=_get_expert_parallel_group(group_name))
def _get_expert_parallel_src_rank(group_name):
"""Calculate the global rank corresponding to a local rank zero
in the expert parallel group."""
assert dist.is_initialized(), 'dist is not initialized'
global_rank = dist.get_rank()
local_world_size = _get_expert_parallel_world_size(group_name)
return (global_rank // local_world_size) * local_world_size
@ -601,11 +607,13 @@ def _get_expert_parallel_src_rank(group_name):
def _get_expert_data_parallel_rank(group_name):
"""Return my rank for the expert data parallel group."""
assert dist.is_initialized(), 'dist is not initialized'
return dist.get_rank(group=_get_expert_data_parallel_group(group_name))
def _get_data_parallel_world_size():
"""Return world size for the data parallel group."""
assert dist.is_initialized(), 'dist is not initialized'
if mesh_device is not None:
return dist.get_world_size(mesh_device.get_group(mesh_dim="data_parallel"))
global mpu
@ -627,11 +635,13 @@ def _get_model_parallel_world_size():
def _get_data_parallel_rank():
"""Return my rank for the data parallel group."""
assert dist.is_initialized(), 'dist is not initialized'
return dist.get_rank(group=_get_data_parallel_group())
def _get_sequence_parallel_world_size():
"""Return world size for the sequence parallel group."""
"""Return world size for the model parallel group."""
assert dist.is_initialized(), 'dist is not initialized'
global mpu
if mesh_device is not None:
return dist.get_world_size(mesh_device.get_group(mesh_dim="sequence_parallel"))

View File

@ -122,7 +122,7 @@ lnav:
- title: 'Transformer Kernel'
url: /tutorials/transformer_kernel/
- title: 'Arctic Long Sequence Training (ALST) for HF Transformers integration'
url: /tutorials/ulysses-alst-sequence-pallellism
url: /tutorials/ulysses-alst-sequence-parallelism
- title: 'ZeRO-Offload'
url: /tutorials/zero-offload/
- title: 'ZeRO'

View File

@ -5,7 +5,7 @@ tags: training sequence-parallelism
In this tutorial we describe how to enable DeepSpeed-Ulysses for Megatron-Deepspeed. DeepSpeed-Ulysses is a simple but highly communication and memory efficient mechanism sequence parallelism approach for training of large transformer models with massive sequence lengths. It partitions input tensors along the sequence dimension and uses a communication-efficient all-2-all collective for distributed attention computations. Additionally, DeepSpeed-Ulysses incorporates advanced modeling and system optimizations, such as Flash attention, sparse attention, and ZeRO optimizer, to optimize both computational efficiency and memory usage. Training with DeepSpeed sequence parallelism allows both model size and sequence length to scale near indefinitely unbounded by single GPU memory limitation and at a high fraction of peak compute performance. Currently, DeepSpeed-Ulysses can handle sequences up to 1 million in length (10 times the size of a complete Harry Potter book!) on 64 A100 GPUs. Please read our [DeepSpeed-Ulysses blog](https://github.com/deepspeedai/DeepSpeed/tree/master/blogs/deepspeed-ulysses) to learn more!
If you're interested in a newer version that works with HF Transformers, please see https://www.deepspeed.ai/tutorials/ulysses-alst-sequence-pallellism
If you're interested in a newer version that works with HF Transformers, please see https://www.deepspeed.ai/tutorials/ulysses-alst-sequence-parallelism
## 1. Installation

View File

@ -74,4 +74,11 @@ Finally, here is a screenshot of `htop` showing host CPU and memory activity dur
<img src="/assets/images/zero_offload_dp1_10B_cpu.png">
</a>
### CPU Adam perf tuning
ZeRO offload already support multi-gpu training. If the workload is using CPU optimizer, the workload can be further tuned by passing `--bind_cores_to_rank` to the deepspeed launch command. This switch will mainly do two things:
1. Divide physical CPU cores evenly among ranks, make each rank to have a dedicated set of CPU cores to run CPU optimizer.
2. Set OMP_NUM_THREADS environment variable to the number of CPU cores assigned to each rank, so OpenMP code in CPU optimizer will have near optimal performance.
ZeRO offload is a hybrid workload that is both heavy on GPU and CPU, and DeepSpeed is optimized for both GPU and CPU performance. Refer to [How to launch DeepSpeed on Intel Architecture CPU](https://github.com/deepspeedai/DeepSpeed/blob/master/docs/_tutorials/accelerator-setup-guide.md#how-to-launch-deepspeed-on-intel-architecture-cpu) for more details on how to tune core bindings for CPU performance.
Congratulations! You have completed the ZeRO-Offload tutorial.

View File

@ -431,7 +431,6 @@ class OpBuilder(ABC):
print(f"{WARNING} {self.name} cuda is missing or is incompatible with installed torch, "
"only cpu ops can be compiled!")
return '-D__DISABLE_CUDA__'
return '-D__DISABLE_CUDA__'
def _backup_cpuinfo(self):
# Construct cpu_info dict from lscpu that is similar to what py-cpuinfo provides

View File

@ -39,7 +39,6 @@ class CCLCommBuilder(CPUOpBuilder):
raise ValueError(
"Didn't find CCL_ROOT, install oneCCL from https://github.com/oneapi-src/oneCCL and source its environment variable"
)
return []
else:
return ['-lccl', f'-L{ccl_root_path}/lib']

View File

@ -1,73 +0,0 @@
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
import pytest
import os
import torch
from deepspeed.accelerator import get_accelerator
@pytest.mark.evaluation
@pytest.mark.parametrize("model_name", ["codellama/CodeLlama-7b-Python-hf"])
def test_human_eval(model_name):
import mii
import numpy
from transformers import pipeline
from human_eval.data import write_jsonl, read_problems
from human_eval.evaluation import evaluate_functional_correctness
def generate_base_completion(pipe, problem_prompt: str) -> str:
return pipe(problem_prompt, do_sample=True)[0]["generated_text"]
def generate_mii_completion(pipe, problem_prompt: str) -> str:
return pipe(problem_prompt, max_new_tokens=512)[0].generated_text
def generate_samples(pipe, generation_function):
samples = [
dict(task_id=task_id, completion=generation_function(pipe, problems[task_id]["prompt"]))
for task_id in problems for _ in range(num_samples_per_task)
]
return samples
# Loading Problems
problems = read_problems("../../human-eval/data/HumanEval.jsonl.gz")
num_samples_per_task = 20
# Initializing HuggingFace Pipeline
local_rank = os.getenv("LOCAL_RANK", "0")
device = torch.device(get_accelerator().device_name(local_rank))
base_pipe = pipeline(model=model_name,
device=torch.device(get_accelerator().device_name(local_rank)),
max_length=512,
return_full_text=False)
# Generating Base Samples
base_samples = generate_samples(base_pipe, generate_base_completion)
# Base Pipeline Teardown
del base_pipe
get_accelerator().empty_cache()
# Initializing DeepSpeed-MII Pipeline
mii_pipe = mii.pipeline(model_name)
# Generating MII Samples
mii_samples = generate_samples(mii_pipe, generate_mii_completion)
# MII Pipeline Teardown
mii_pipe.destroy()
# Writing Samples
write_jsonl("base_samples.jsonl", base_samples)
write_jsonl("mii_samples.jsonl", mii_samples)
# Evaluating Samples
base_results = evaluate_functional_correctness("base_samples.jsonl")
mii_results = evaluate_functional_correctness("mii_samples.jsonl")
# Executing Assertions
for key in base_results.keys():
assert numpy.allclose(base_results[key], mii_results[key], rtol=0.10), \
f"Base result: {base_results[key]}, MII result: {mii_results[key]}, outside of rtol."

View File

@ -553,6 +553,61 @@ class TestInjectionPolicy(DistributedTest):
assert assert_fn(bs_output, ds_output)
@pytest.mark.seq_inference
@pytest.mark.parametrize("model_w_task", [("Felladrin/Llama-160M-Chat-v1", "text-generation")], ids=["llama"])
@pytest.mark.parametrize("dtype", [torch.half], ids=["fp16"])
class TestLlamaInjection(DistributedTest):
world_size = 1
def test(self, model_w_task, dtype, query, inf_kwargs, assert_fn):
invalid_test_msg = validate_test(model_w_task, dtype, enable_cuda_graph=False, enable_triton=False)
if invalid_test_msg:
pytest.skip(invalid_test_msg)
if dtype not in get_accelerator().supported_dtypes():
pytest.skip(f"Accelerator {get_accelerator().device_name()} does not support {dtype}.")
if not deepspeed.ops.__compatible_ops__[InferenceBuilder.NAME]:
pytest.skip("This op had not been implemented on this system.", allow_module_level=True)
model, task = model_w_task
local_rank = int(os.getenv("LOCAL_RANK", "0"))
device = torch.device(get_accelerator().device_name(local_rank))
pipe = pipeline(task,
model=model,
device=torch.device("cpu"),
model_kwargs={"low_cpu_mem_usage": True},
framework="pt")
if dtype == torch.half:
pipe.model.half()
pipe.device = device
pipe.model.to(device)
bs_output = pipe(query, **inf_kwargs)
try:
pipe.model = deepspeed.init_inference(pipe.model,
mp_size=self.world_size,
dtype=dtype,
replace_with_kernel_inject=True)
check_injection(pipe.model)
except AttributeError as e:
if "'LlamaAttention' object has no attribute 'num_heads'" in str(e):
pytest.skip("Skipping due to transformers version compatibility issue with self-attention")
raise e
ds_output = pipe(query, **inf_kwargs)
print(local_rank, "baseline", bs_output)
print(local_rank, "deepspeed", ds_output)
# Llama models are not matching baseline exactly
# We skip the result check for now, since this is irrelevant to this test
# assert assert_fn(bs_output, ds_output)
@pytest.mark.seq_inference
@pytest.mark.parametrize('keep_module_on_host', [True, False])
@pytest.mark.parametrize(

View File

@ -394,7 +394,7 @@ class TestIncorectAllgatherBucketSize(DistributedTest):
class TestPartitionNcclAlignment(DistributedTest):
world_size = 4
world_size = 2
def test(self, zero_stage=2):
config_dict = {
@ -835,7 +835,7 @@ class TestZero3ParamPartitioningBase(DistributedTest):
@pytest.mark.parametrize("init_context_manager", [True, False])
@pytest.mark.parametrize("reduce_scatter", [True, False])
class TestZero3ParamPartitioningLargeParam(DistributedTest):
world_size = 4
world_size = 2
def test(self, init_context_manager: bool, reduce_scatter: bool, param_sz: int = 8100) -> None:
@ -997,7 +997,7 @@ class TestZero3ParamPartitioningManyParams(DistributedTest):
class TestZero3InitForParentWeightInitialization(DistributedTest):
world_size = 4
world_size = 2
def test(self):

View File

@ -6,11 +6,11 @@
Arctic Long Sequence Training (ALST) Tiled compute component tests
"""
from deepspeed.runtime.sequence_parallel.ulysses_sp import TiledMLP, sequence_tiled_compute
from deepspeed.runtime.sequence_parallel.ulysses_sp import TiledMLP, sequence_tiled_compute, TiledFusedLogitsLoss
from deepspeed.utils import safe_get_full_grad
from torch.nn import Linear, Module
from unit.common import DistributedTest, preferred_dtype
from unit.util import torch_assert_equal, torch_assert_close
from unit.util import torch_assert_equal, torch_assert_close, CaptureStderr
import deepspeed
import pytest
import torch
@ -97,11 +97,12 @@ def mlp_forward_sequence_tiled_compute(self, x):
)
@pytest.mark.parametrize("batch_size", [1, 2])
@pytest.mark.parametrize("zero_stage", [1, 3])
class TestTiledCompute(DistributedTest):
world_size = 1
def test_tiled_mlp(self, zero_stage):
def test_tiled_mlp(self, zero_stage, batch_size):
config_dict = {
"train_micro_batch_size_per_gpu": 1,
@ -127,8 +128,8 @@ class TestTiledCompute(DistributedTest):
vocab_size = 10
seed = 42
hidden_dim = 128
bs = 2
seqlen = 64
bs = batch_size
seqlen = 125 # use a non 2**n length to test varlen shards (last short)
torch.manual_seed(seed)
x = torch.rand((bs, seqlen, hidden_dim), dtype=dtype, requires_grad=True)
y = torch.empty((bs, seqlen), dtype=torch.long, requires_grad=False).random_(vocab_size)
@ -166,7 +167,12 @@ class TestTiledCompute(DistributedTest):
x_b = x.clone().detach().requires_grad_(True)
y_b = y.clone().detach()
loss_b = model_b(x_b, y_b)
model_b.backward(loss_b)
with CaptureStderr() as cs:
model_b.backward(loss_b)
# see the explanation inside TiledMLP.backward
assert "grad and param do not obey the gradient layout contract" not in cs.err, f"stride issue: {cs.err}"
param_grad_b1 = get_grad(model_b.module.mlp1.up_proj.weight, zero_stage)
param_grad_b2 = get_grad(model_b.module.mlp2.up_proj.weight, zero_stage)
x_grad_b = x_b.grad
@ -202,7 +208,11 @@ class TestTiledCompute(DistributedTest):
x_c = x.clone().detach().requires_grad_(True)
y_c = y.clone().detach()
loss_c = model_c(x_c, y_c)
model_c.backward(loss_c)
with CaptureStderr() as cs:
model_c.backward(loss_c)
assert "grad and param do not obey the gradient layout contract" not in cs.err, f"stride issue: {cs.err}"
param_grad_c1 = get_grad(model_c.module.mlp1.up_proj.weight, zero_stage)
param_grad_c2 = get_grad(model_c.module.mlp2.up_proj.weight, zero_stage)
x_grad_c = x_c.grad
@ -219,3 +229,129 @@ class TestTiledCompute(DistributedTest):
torch_assert_close(param_grad_a1, param_grad_c1) #, rtol=1e-03, atol=1e-04)
torch_assert_close(param_grad_a2, param_grad_c2) #, rtol=1e-03, atol=1e-04)
torch_assert_close(x_grad_a, x_grad_c)
@pytest.mark.parametrize("batch_size", [1, 2])
@pytest.mark.parametrize("zero_stage", [1, 3])
class TestTiledFusedLogitsLoss(DistributedTest):
world_size = 1
def test_tiled_fused_logits_loss(self, zero_stage, batch_size):
def tiled_forward(self, x, y):
x = self.mlp1(x)
x = self.mlp2(x)
def loss_fn(self, x, y):
logits = self.lm_head(x)
return self.cross_entropy_loss(logits.view(-1, self.vocab_size), y.view(-1))
mask = None
shards = 2
compute_params = [self.lm_head.weight]
output_reduction = "mean"
loss = TiledFusedLogitsLoss.apply(
loss_fn,
self,
x,
y,
mask,
shards,
compute_params,
output_reduction,
)
return loss
config_dict = {
"train_micro_batch_size_per_gpu": 1,
"zero_optimization": {
"stage": zero_stage
},
"optimizer": {
"type": "Adam",
"params": {
"lr": 1e-3
}
},
}
dtype = preferred_dtype()
#dtype = torch.float
if dtype == torch.bfloat16:
config_dict["bf16"] = {"enabled": True}
elif dtype == torch.float16:
config_dict["fp16"] = {"enabled": True, "loss_scale": 1.0}
# for debug
# torch.set_printoptions(precision=8, sci_mode=True)
vocab_size = 100
seed = 42
hidden_dim = 64
bs = batch_size
seqlen = 425 # use a non 2**n length to test varlen shards (last short)
torch.manual_seed(seed)
x = torch.rand((bs, seqlen, hidden_dim), dtype=dtype, requires_grad=True)
y = torch.empty((bs, seqlen), dtype=torch.long, requires_grad=False).random_(vocab_size)
# A. Baseline: model with normal loss
torch.manual_seed(seed)
model_a = MyModel(hidden_dim=hidden_dim, vocab_size=vocab_size).to(dtype)
model_a, _, _, _ = deepspeed.initialize(config=config_dict,
model=model_a,
model_parameters=model_a.parameters())
x = x.to(model_a.device)
y = y.to(model_a.device)
x_a = x.clone().detach().requires_grad_(True)
y_a = y.clone().detach()
loss_a = model_a(x_a, y_a)
model_a.backward(loss_a)
param_grad_a = get_grad(model_a.module.lm_head.weight, zero_stage)
x_grad_a = x_a.grad
assert param_grad_a is not None
assert x_grad_a is not None
# B. model with fused tiled logits loss
torch.manual_seed(seed)
MyModel.forward_orig = MyModel.forward
MyModel.forward = tiled_forward
model_b = MyModel(hidden_dim=hidden_dim, vocab_size=vocab_size).to(dtype)
model_b, _, _, _ = deepspeed.initialize(config=config_dict,
model=model_b,
model_parameters=model_b.parameters())
x_b = x.clone().detach().requires_grad_(True)
y_b = y.clone().detach()
loss_b = model_b(x_b, y_b)
with CaptureStderr() as cs:
model_b.backward(loss_b)
# see the explanation inside TiledMLP.backward
assert "grad and param do not obey the gradient layout contract" not in cs.err, f"stride issue: {cs.err}"
param_grad_b = get_grad(model_b.module.lm_head.weight, zero_stage)
x_grad_b = x_b.grad
assert param_grad_b is not None
assert x_grad_b is not None
# print(f"{loss_a=}")
# print(f"{loss_b=}")
# print(f"{x_grad_a=}")
# print(f"{x_grad_b=}")
# print(f"{param_grad_a=}")
# print(f"{param_grad_b=}")
# usually this is an exact match, but on cpu CI this fails.
torch_assert_close(loss_a, loss_b)
# Gradient will not be exactly the same, especially under half-precision. And bf16 is
# particularly lossy so need to lower tolerance a bit more than the default. Switch to
# dtype torch.float or even torch.double to see that the diff is tiny - so the math is
# correct, but accumulation error adds up. Alternatively making hidden_dim bigger makes the
# divergence much smaller as well.
torch_assert_close(x_grad_a, x_grad_b)
torch_assert_close(param_grad_a, param_grad_b) #, rtol=1e-03, atol=1e-04)
# restore
MyModel.forward = MyModel.forward_orig

View File

@ -3,13 +3,17 @@
# DeepSpeed Team
import pytest
import torch
import deepspeed
from deepspeed.accelerator import get_accelerator, is_current_accelerator_supported
from deepspeed.git_version_info import torch_info
from io import StringIO
import deepspeed
import logging
import pytest
import re
import sys
import torch
def skip_on_arch(min_arch=7):
if get_accelerator().device_name() == 'cuda':
@ -119,3 +123,193 @@ def torch_assert_dicts_of_tensors_equal(actual, expected, **kwargs):
"""
for k in actual.keys():
torch.testing.assert_close(actual[k], expected[k], rtol=0.0, atol=0.0, **kwargs)
# CaptureStd, CaptureLogger context managers from https://github.com/stas00/ml-engineering/blob/master/testing/testing_utils.py
# When any function contains print() calls that get overwritten, like progress bars,
# a special care needs to be applied, since under pytest -s captured output (capsys
# or contextlib.redirect_stdout) contains any temporary printed strings, followed by
# \r's. This helper function ensures that the buffer will contain the same output
# with and without -s in pytest, by turning:
# foo bar\r tar mar\r final message
# into:
# final message
# it can handle a single string or a multiline buffer
def apply_print_resets(buf):
return re.sub(r"^.*\r", "", buf, 0, re.M)
class CaptureStd:
"""
Context manager to capture:
- stdout: replay it, clean it up and make it available via ``obj.out``
- stderr: replay it and make it available via ``obj.err``
- combined: combined the chosen streams and make it available via ``obj.combined``
init arguments:
- out - capture stdout:`` True``/``False``, default ``True``
- err - capture stdout: ``True``/``False``, default ``True``
- replay - whether to replay or not: ``True``/``False``, default ``True``. By default each
captured stream gets replayed back on context's exit, so that one can see what the test was
doing. If this is a not wanted behavior and the captured data shouldn't be replayed, pass
``replay=False`` to disable this feature.
Examples::
# to capture stdout only with auto-replay
with CaptureStdout() as cs:
print("Secret message")
assert "message" in cs.out
# to capture stderr only with auto-replay
import sys
with CaptureStderr() as cs:
print("Warning: ", file=sys.stderr)
assert "Warning" in cs.err
# to capture both streams with auto-replay
with CaptureStd() as cs:
print("Secret message")
print("Warning: ", file=sys.stderr)
assert "message" in cs.out
assert "Warning" in cs.err
# to capture just one of the streams, and not the other, with auto-replay
with CaptureStd(err=False) as cs:
print("Secret message")
assert "message" in cs.out
# but best use the stream-specific subclasses
# to capture without auto-replay
with CaptureStd(replay=False) as cs:
print("Secret message")
assert "message" in cs.out
# sometimes it's easier to not try to figure out if it's stdout or stderr, and yet at
# other times the software may send the same output to stderr or stdout depending on
# environment, so to make the test robust a combined entry of both streams is available
"""
def __init__(self, out=True, err=True, replay=True):
self.replay = replay
if out:
self.out_buf = StringIO()
self.out = "error: CaptureStd context is unfinished yet, called too early"
else:
self.out_buf = None
self.out = "not capturing stdout"
if err:
self.err_buf = StringIO()
self.err = "error: CaptureStd context is unfinished yet, called too early"
else:
self.err_buf = None
self.err = "not capturing stderr"
self.combined = "error: CaptureStd context is unfinished yet, called too early"
def __enter__(self):
if self.out_buf is not None:
self.out_old = sys.stdout
sys.stdout = self.out_buf
if self.err_buf is not None:
self.err_old = sys.stderr
sys.stderr = self.err_buf
self.combined = ""
return self
def __exit__(self, *exc):
if self.out_buf is not None:
sys.stdout = self.out_old
captured = self.out_buf.getvalue()
if self.replay:
sys.stdout.write(captured)
self.out = apply_print_resets(captured)
self.combined += self.out
if self.err_buf is not None:
sys.stderr = self.err_old
captured = self.err_buf.getvalue()
if self.replay:
sys.stderr.write(captured)
self.err = captured
self.combined += self.err
def __repr__(self):
msg = ""
if self.out_buf:
msg += f"stdout: {self.out}\n"
if self.err_buf:
msg += f"stderr: {self.err}\n"
return msg
# in tests it's the best to capture only the stream that's wanted, otherwise
# it's easy to miss things, so unless you need to capture both streams, use the
# subclasses below (less typing). Or alternatively, configure `CaptureStd` to
# disable the stream you don't need to test.
class CaptureStdout(CaptureStd):
"""Same as CaptureStd but captures only stdout"""
def __init__(self, replay=True):
super().__init__(err=False, replay=replay)
class CaptureStderr(CaptureStd):
"""Same as CaptureStd but captures only stderr"""
def __init__(self, replay=True):
super().__init__(out=False, replay=replay)
class CaptureLogger:
"""
Context manager to capture `logging` streams
Args:
- logger: 'logging` logger object
Results:
The captured output is available via `self.out`
Example::
>>> from transformers import logging
>>> from transformers.testing_utils import CaptureLogger
>>> msg = "Testing 1, 2, 3"
>>> logging.set_verbosity_info()
>>> logger = logging.get_logger("transformers.models.bart.tokenization_bart")
>>> with CaptureLogger(logger) as cl:
... logger.info(msg)
>>> assert cl.out, msg+"\n"
"""
def __init__(self, logger):
self.logger = logger
self.io = StringIO()
self.sh = logging.StreamHandler(self.io)
self.out = ""
def __enter__(self):
self.logger.addHandler(self.sh)
return self
def __exit__(self, *exc):
self.logger.removeHandler(self.sh)
self.out = self.io.getvalue()
def __repr__(self):
return f"captured: {self.out}\n"

View File

@ -1 +1 @@
0.17.3
0.17.5