Compare commits

...

57 Commits

Author SHA1 Message Date
173368cb63 CUDAEvent::elapsed_time could accidentally initialize a non-used GPU (#122538)
This sets the device before call cudaEventElapsedTime to avoid the case
where the "cudaGetCurrentDevice" device would be initialized even though
neither event is on that device.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/122538
Approved by: https://github.com/shuqiangzhang, https://github.com/wconstab
2024-04-22 15:39:28 -07:00
b86edd97d6 [nccl-pg] print broadcast ncclunique id duration (#123963)
Summary: Print NCCL PG broadcast nccl unique id duration for measurement.

Differential Revision: D56048059

Pull Request resolved: https://github.com/pytorch/pytorch/pull/123963
Approved by: https://github.com/wconstab
2024-04-16 17:03:25 -07:00
b33a283e9a [nccl-pg] Pass pg name and desc to NCCL communicator (#124149)
Summary:
Pass Process Group Name and Desc to NCCL communicator in order to access pg information in NCCL layer.
The information is passed as commDesc string(i.e. "<pg_desc>:<pg_name>")
Function only valid when NCCL_COMM_DESCRIPTION is defined.

Differential Revision: D55703310

Pull Request resolved: https://github.com/pytorch/pytorch/pull/124149
Approved by: https://github.com/shuqiangzhang
2024-04-16 15:08:38 -07:00
7a551d81e5 [c10d/nccl-pg] allow user to pass process group description (#123472)
Summary:
We need a way to allow user set a customized description for a process group, e.g. FSDP, PP.

Here are several use cases of user specified group_desc:
- Logging: we can easily match a log line and understand what's this collective/pg is used to.
- Pytorch traces (e.g. Kineto, Execution Trace) can benefit from the PG desc since trace analysis, benchmarks will be able to easily differentiate PG purpose like FSDP, PP.
- Lower layer collectives(e.g. NCCL) debug: we will be able to expose PG desc to NCCL communicator so NCCL layer operations can be easily correlated to a PG.

Solution: Add a group_desc field to c10d

Differential Revision: D55781850

Pull Request resolved: https://github.com/pytorch/pytorch/pull/123472
Approved by: https://github.com/kwen2501
2024-04-16 15:08:38 -07:00
1515a90475 [DCP] Adds ability to create a CPU state dict that is both shared and pinned (#122338)
[DCP] Adds ability to create a CPU state dict that is both shared and pinned, as well as a new utility specific to copying the state dict

https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__MEMORY.html#group__CUDART__MEMORY_1ge8d5c17670f16ac4fc8fcb4181cb490c

Pull Request resolved: https://github.com/pytorch/pytorch/pull/122338
Approved by: https://github.com/fegin
2024-04-16 15:08:22 -07:00
4882ec2a91 Pass and record process_group_name when creating ProcessGroupNCCL (#123117)
Summary:
Pass python c10d group_name to c++ ProcessGroupNCCL so that the pg name will be consistent across different layers.
Also record pg_name in flight recorder entry.

Differential Revision: D55597200

Pull Request resolved: https://github.com/pytorch/pytorch/pull/123117
Approved by: https://github.com/wconstab
2024-04-16 13:48:35 -07:00
972b8060bd [c10d] make monitorThread sleep when we try to dump (#123788)
Summary:
We seperated the FR dump logic from the desync debug logic,
so we no longer set collectiveDebugInfoMode_ to true when we just need FR
dump. That's why monitor thread did not sleep and try to kill the
process without waiting for the dump.

The fix is simple, we should sleep whenever shouldDump_ is true
Test Plan:
Existing unit tests

Pull Request resolved: https://github.com/pytorch/pytorch/pull/123788
Approved by: https://github.com/wconstab
2024-04-11 09:19:15 -07:00
3e7683ae18 [c10d] dump on any exception (timeout + nccl error) (#123023)
Summary:
Existing flight recorder dumping logic is: dump only on timeout, but not
on NCCL error. This resulted in the faulty ranks missing dumps when NCCL
error happens.

So in this PR, we revise the logic of dump such that records are dumped
when any exception is detected. Exception could be 1. NCCL async errors.
2. watchdog timeout

Also the existing code tends to mix the logic of flight recorder dump
and desync debug, which is no desirable. We only dump the desync debug
report only when timeout is detected.
Test Plan:
Added a new unit test to trigger nccl error and dump, and make sure the
dump is triggered by the error.

Also existing dump on timeout tests should still pass.

sqzhang_1) [sqzhang@devgpu009.cln1 ~/pytorch (84bf9d4c)]$ python
test/distributed/test_c10d_nccl.py NcclErrorDumpTest
NCCL version 2.19.3+cuda12.0
[E329 19:15:11.775879730 ProcessGroupNCCL.cpp:565] [Rank 0] Watchdog
caught collective operation timeout: WorkNCCL(SeqNum=2,
OpType=ALLREDUCE, NumelIn=10, NumelOut=10, Timeout(ms)=10000) ran for
10028 milliseconds before timing out.
[E329 19:15:11.777459894 ProcessGroupNCCL.cpp:1561] [PG 0 Rank 0]
Exception hit in NCCL work: 2
[E329 19:15:12.660717323 ProcessGroupNCCL.cpp:1332] [PG 0 Rank 0]
Received a timeout signal from this local rank and will start to dump
the debug info. Last enqueued NCCL work: 2, last completed NCCL work: 1.
[E329 19:15:12.660932242 ProcessGroupNCCL.cpp:1167] [PG 0 Rank 0]
ProcessGroupNCCL preparing to dump debug info.
[E329 19:15:12.661192990 ProcessGroupNCCL.cpp:1174] [PG 0 Rank 0]
ProcessGroupNCCL dumping nccl trace to /tmp/tmp06psqil3/trace_0
[F329 19:15:12.661485601 ProcessGroupNCCL.cpp:1185] [PG 0 Rank 0] [PG 0
Rank 0] ProcessGroupNCCL's watchdog detected a collective timeout from
the local rank. This is most likely caused by incorrect usages of
collectives, e.g., wrong sizes used across ranks, the order of
collectives is not same for all ranks or the scheduled collective, for
some reason, didn't run. Additionally, this can be caused by GIL
deadlock or other reasons such as network errors or bugs in the
communications library (e.g. NCCL), etc. We tried our best to dump the
debug info into the storage to help you debug the issue.

Tags:

Pull Request resolved: https://github.com/pytorch/pytorch/pull/123023
Approved by: https://github.com/wconstab
2024-04-02 15:41:15 -07:00
f2e9ec2dc5 [c10d] dump from one and only one thread (PG0's monitor thread) (#120893)
Summary:
When there are multiple PGs in a process and a hardware failure happens,
we found that multiple PGs/ threads in the same
process are competing to dump the same records at the same time. The
affects the reliability of dumps.

In this PR, we will try to make the change such that only one thread/PG
could dump: PG0's monitor thread. We use a static variable to indicate
that something (e.g., collective timeout) has triggered the dump
locally.

monitor thread would dump debug info under any one of the 3 conditions:
1: this static variable is set to true by the watchdog thread when it detects
a timeout or pipe dump signal
2: timeout signal is received from other ranks through tcpstore
3: no heartbeat of watchdog
Test Plan:
python test/distributed/test_c10d_nccl.py -k
test_timeout_dumps_on_stuck_ranks

Pull Request resolved: https://github.com/pytorch/pytorch/pull/120893
Approved by: https://github.com/wconstab
2024-04-02 15:36:05 -07:00
dde4324d8e [NCCL PG] Enable ncclCommDevIdxMap unconditionally (#122049)
Differential Revision: D54993977

The initial purpose of ncclCommDevIdxMap is to support NCCL zero copy algorithms. Therefore, it is only enabled (with its values filled) if useTensorRegisterAllocatorHook_ is set to true. However, now we rely on it to support dumping NCCL information in a single PG. So we need it to be always available, regardless of whether we enabled useTensorRegisterAllocatorHook_.
Move the code of filling ncclCommDevIdxMap out of if (useTensorRegisterAllocatorHook_) statement.

See diff

Pull Request resolved: https://github.com/pytorch/pytorch/pull/122049
Approved by: https://github.com/shuqiangzhang
2024-03-26 17:14:06 -07:00
94c079104d [c10d] fix the macro definition of NCCL_COMM_DUMP (#120502)
Summary:
Only if both macros are defined, should we dump the comm dump,
otherwise, use the original definition.

The previous implementation missed the function definition when IS_NCCL_EXP is defined but NCCL_COMM_DUMP is not defined

Test Plan:
Build and unit test

Pull Request resolved: https://github.com/pytorch/pytorch/pull/120502
Approved by: https://github.com/dsjohns2, https://github.com/Skylion007
2024-03-26 14:09:00 -07:00
a6afee6d94 [c10d][flight recorder] dump additinal NCCL debug info (#120063)
Summary:
This PR is mainly about flight recorder side of changes that takes a
map of maps as input, and dump it as picklable. Also add functions that
should be compiled only when NCCL_COMM_DUMP is defined
Test Plan:
Integration tests with NCCL would be done later, here we only do the
c10d side of dump test, aka,NCCLTraceTest

Testing the dump function is a bit tricky as we don't have
existing C++ unit tests for them. So we still use the Python NCCLTraceTest with
the python binding of _dump_nccl_trace(), we manually fed the
dump_nccl_trace with a map of test info, and assert the pickle result and
print the converted python dict:
```
(sqzhang_1) [sqzhang@devgpu009.cln1 ~/pytorch (main)]$  python
test/distributed/test_c10d_nccl.py NCCLTraceTest
NCCL version 2.19.3+cuda12.0
[rank0]:[E ProcessGroupNCCL.cpp:1200] [PG 0 Rank 0] ProcessGroupNCCL
preparing to dump debug info.
.NCCL version 2.19.3+cuda12.0
.NCCL version 2.19.3+cuda12.0
{'ncclID2': {'Key2': 'Value2', 'Key1': 'Value1'}, 'ncclID1': {'Key2':
'Value2', 'Key1': 'Value1'}}
{'ncclID2': {'Key2': 'Value2', 'Key1': 'Value1'}, 'ncclID1': {'Key2':
'Value2', 'Key1': 'Value1'}}
.NCCL version 2.19.3+cuda12.0
{'ncclID2': {'Key2': 'Value2', 'Key1': 'Value1'}, 'ncclID1': {'Key2':
'Value2', 'Key1': 'Value1'}}
{'ncclID2': {'Key2': 'Value2', 'Key1': 'Value1'}, 'ncclID1': {'Key2':
'Value2', 'Key1': 'Value1'}}
.NCCL version 2.19.3+cuda12.0
.NCCL version 2.19.3+cuda12.0
.NCCL version 2.19.3+cuda12.0
.NCCL version 2.19.3+cuda12.0
.
----------------------------------------------------------------------
Ran 8 tests in 95.761s
OK
```

Pull Request resolved: https://github.com/pytorch/pytorch/pull/120063
Approved by: https://github.com/wconstab
2024-03-26 14:08:19 -07:00
d092857531 [Caffe2 CPU tests] Update CMakeLists.txt 2024-02-24 12:18:10 -08:00
6aad5e444a Fix missing MAST log when there is Unicode non-decodable text in logs (#119298)
Summary:
## Issue
When there is Unicode non-decodable text in logs, `tail_logger` will stop working afterwards, i.e. f527390102

In the example, the process stopped producing Python logs after 17:20:21 untill the job finished
```
[0]:I0201 17:20:21.338000 3429 gen_ai/genie_projects/llm/metaformers/reward_model_score.py:335] Progress: 118 batches out of 512 total batches. 23.05 % | (gpu mem: 25.8GB, free CPU mem: 1387.8GB)
I0201 17:39:14 Stopping twtask-main.service with Service Result: [success] Exit Code: [exited] Exit Status: [0]
```
At the end, `UnicodeDecodeError` was thrown at the end with no call stack.

## Fix
Use `errors="replace"` to avoid throwing exception when `UnicodeDecodeError` happens.

Test Plan: f528854819

Differential Revision: D53483644

Co-authored-by: Jack Zhang <jackzh@meta.com>
Pull Request resolved: https://github.com/pytorch/pytorch/pull/119298
Approved by: https://github.com/XilunWu
2024-02-24 12:16:39 -08:00
c54ce9313b [c10d][flight recorder] store a copy of string in entry (#119837)
Summary:
Previously, we just store the char pointer in entry, the string is a
temp object and will be destructed when we want to dump/access it.

A quick fix is to store a copy of the string, but without changing the
upstream char*.

An alternative is to change every profilingTitle into std:string, this
however would needs comprehensive overhall of the code up to the
c10d::work layer above workNCCL and RecordFunction etc.

We chose the first option for this change

Resolve #119808

Pull Request resolved: https://github.com/pytorch/pytorch/pull/119837
Approved by: https://github.com/zdevito, https://github.com/wconstab
2024-02-14 11:38:10 -08:00
1fe59f4ef7 [c10d][flight recorder] remove unintended assignment of entry (#119748)
Summary:
auto& entry = entries_.at(*id % max_entries_);
entry = entries_.at(*id % max_entries_);
The above line of code has unintended consequence of invoking copy/assignment
of entry objects as ref itself cannot be re-assigned.

Also what could cause the crash is that the entry ref could become invalid if entries_ are
resized by other threads. and this could result in 'copy to a garbage
location'. The fix is to use a pointer which can be re-assigned after
re-acquiring the lock

Tests: python test/distributed/test_c10d_nccl.py NCCLTraceTest

Pull Request resolved: https://github.com/pytorch/pytorch/pull/119748
Approved by: https://github.com/wconstab, https://github.com/fegin
2024-02-14 11:38:10 -08:00
e693fb2bb1 [nccl flight recorder] record time we discover start and complete (#119249)
Some APIs like ncclCommAbort can cause nccl kernels to finish even if
they were previously stuck. Because we can gather the trace buffer after
those calls, we can end up seeing some collectives marked completed eventhough
that complete happened several minutes after they started and clearly after
the timeout. This changes how we record state so that we keep track of the time
we discover a state change, so even if eventually the collective gets marked complete,
we can observe it happened minutes after it was schedule.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/119249
Approved by: https://github.com/wconstab
2024-02-14 11:38:10 -08:00
4fe510baf6 [NCCL PG] log NCCL comm at creation and abort (#118335)
Summary: It helps correlate NCCL PG with corresponding NCCL comm in separate logs.

Differential Revision: D53107647

Pull Request resolved: https://github.com/pytorch/pytorch/pull/118335
Approved by: https://github.com/wconstab
2024-02-14 11:38:04 -08:00
7c507b78c4 [c10d] Expose check method to Python for store via pybind (#116144)
Differential Revision: [D52310987](https://our.internmc.facebook.com/intern/diff/D52310987)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/116144
Approved by: https://github.com/wconstab
2024-01-31 11:08:27 -08:00
0019901601 [C10D] Fix nccl flightrecorder ignored dump timeout (#118142)
Don't call future.get() unless it's ready, because it waits.
Also, refactor the code a bit for simplicity.

We should do a follow-on PR to clean up the timeouts further, but this
should fix the glaring timeout bug.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/118142
Approved by: https://github.com/shuqiangzhang
ghstack dependencies: #118044, #118046, #118047
2024-01-26 16:48:00 -08:00
18be18535b [C10D] Make Flight Recorder report time_created in ns (#118047)
Addresses (6) from #117883

Pull Request resolved: https://github.com/pytorch/pytorch/pull/118047
Approved by: https://github.com/zdevito
ghstack dependencies: #118044, #118046
2024-01-26 16:48:00 -08:00
2729367313 [C10D] Add version tag to NCCL Flight Recorder Dump (#118046)
Addresses (3) from #117883

Pull Request resolved: https://github.com/pytorch/pytorch/pull/118046
Approved by: https://github.com/zdevito
ghstack dependencies: #118044
2024-01-26 16:48:00 -08:00
33537aae24 [C10D] Make NCCL Flight Recorder dump produce a dict (#118044)
Putting the list of entries into a particular key of a top-level dict
paves the way for adding other metadata as other top level keys.

Addresses 1 and 2 from #117883

Pull Request resolved: https://github.com/pytorch/pytorch/pull/118044
Approved by: https://github.com/zdevito
2024-01-26 16:48:00 -08:00
dcdb1337dd [C10D] Finer-grain nccl heartbeat, avoid false positive hangs (#118016)
Summary:
Previously, heatbeat was incremented once per finishing a for loop over a list
of in-progress work items, under the assumption that either the processing
would be predictably quick, or it would hang completely.

In fact, there can be cuda API contention that causes the processing of works
to slow down arbitrarily but not truly deadlock.  To guard against this, we
bump the heartbeat at the smallest unit of progress, one work item being
successfully processed.

Test Plan: CI

Differential Revision: D52973948

Pull Request resolved: https://github.com/pytorch/pytorch/pull/118016
Approved by: https://github.com/shuqiangzhang, https://github.com/kwen2501
2024-01-26 16:48:00 -08:00
9cf0f2bd59 Move getDurationFromFirstEvent to USE_C10D_NCCL ifdef (#117738)
Fixes #117517

Try to move nccl related function *getDurationFromFirstEvent* to USE_C10D_NCCL ifdef (Related to https://github.com/pytorch/pytorch/issues/114575)

Pull Request resolved: https://github.com/pytorch/pytorch/pull/117738
Approved by: https://github.com/wconstab, https://github.com/XilunWu
2024-01-26 16:48:00 -08:00
1d2e877c05 [ProcessGroup] Make watchdog check work queue more frequently (#117297)
Today watchdog's sleep interval is 1s. That's a bit long compared to modern GPU link's (or network link's) speed.

Take DDP and Ampere for example:

DDP's bucket size = 25 MB
Ampere's NVLink speed = 250 GB/s

25 MB / 250 GB/s = 100 ms.
So we are updating the interval to 100 ms.

Update:
25 MB / 250 GB/s = 0.1 ms
But let's see how it goes so far between making the checking more aggressive.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/117297
Approved by: https://github.com/fduwjj
2024-01-26 16:48:00 -08:00
f27b979b0c [c10d] Move the timeout dump check from watchdog to monitoring thread (#117168)
To avoid potential hang in watchdog thread which will prevent us from dumping timeout debugging info, we move the check of global collective timeout signals and dumping debugging info to monitoring thread. We also need to ensure that we don't wait very long to check out the timeout signal from store; otherwise, we will miss the signal and don't get debugging info dumped.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/117168
Approved by: https://github.com/wconstab
2024-01-26 16:48:00 -08:00
f30d6047ad [c10d] Add a timeout check interval variable for timeout dump (#117093)
The current timeout check frequency is relied on monitoring thread's timeout thread which can be too long (even if we set it to 2mins) so let's use a separate timeout variable which users can configure it. And we only only let default PG to check TCPStore so even more frequent check should be fine. (Our stress test is performed on every half second).

Pull Request resolved: https://github.com/pytorch/pytorch/pull/117093
Approved by: https://github.com/wconstab, https://github.com/kwen2501
2024-01-26 16:48:00 -08:00
75311510ef [C10D] Add duration_ms to flight recorder (#114817)
Measures the duration of a collective operation using nccl start/end
events and includes this duration (in ms) in the flight recorder data.

duration_ms will be an optional field, since it only works when
timing is enabled.  Currently timing is enabled when flight recorder
is enabled, but this is not a strict requirement.  Duration is also
not available for collectives not in a completed state.

Note: computing duration can lead to a hang due to calling cudaEventDuration when
the cuda driver queue is full.

We don't ever want dump() api to hang, since we might want dump to help
debug a hang. Hence, we only query durations from the watchdog thread,
and it's possible during dump() call, some of the most recent
collectives durations won't have been computed yet at time of dump.  We
make this tradeoff to ensure that dump() itself will never hang.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/114817
Approved by: https://github.com/fduwjj, https://github.com/zdevito
ghstack dependencies: #116905
2024-01-26 16:48:00 -08:00
dbd6094d05 [C10D](reland) Add GIL checker to NCCL watchdog monitor (#117312)
Whenever the monitor thread kills the watchdog thread for being stuck, we do so to save cluster time and get a faster failure signal, but we want to know more about why it got stuck.

One possible reason for watchdog stuckness is GIL contention, which could be ruled out or observed by making an attempt to acquire the GIL at exit time.

If we cannot acquire the GIL within a short time window (1s) we abort the attempt and report GIL contention, otherwise we report that GIL was acquired successfully.

Reland: uses a function pointer to avoid destructor ordering issues on dlclose. (Looks like the destructor for the std::function was being run later than the libtorchpython lib was unloaded, leading to a crash).

Pull Request resolved: https://github.com/pytorch/pytorch/pull/117312
Approved by: https://github.com/zdevito
2024-01-26 16:48:00 -08:00
397b9d47e9 [ProcessGroup] Do not print NCCL_DEBUG before NCCL init (#117328)
In case /etc/nccl.conf is used, `NCCL_DEBUG` is not set to sys env until NCCL inits.
The deleted print point is before NCCL inits, hence may be inaccurate.
This PR removes it and relies on the other print point which is after NCCL comm creation.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/117328
Approved by: https://github.com/wconstab, https://github.com/fduwjj
2024-01-26 16:48:00 -08:00
36a01a8ab9 [c10d][EZ] Add more logs in the destructor of ProcessGroupNCCL for better root cause investigation (#117291)
Add logs to the place where we inspect whether a hang happens.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/117291
Approved by: https://github.com/XilunWu, https://github.com/shuqiangzhang
2024-01-26 16:48:00 -08:00
ee336cf58a [c10d] Add comments to the rest environment variable within NCCLPG (#117092)
Not every environment within NCCLPG has comments, let's add comments to each of them.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/117092
Approved by: https://github.com/kwen2501
ghstack dependencies: #116545
2024-01-26 16:48:00 -08:00
a9e2e745d7 [c10d] Add extra sleep in waitForDumpOrTimeout to ensure enough time for all ranks dump debug info (#116545)
We added an extra sleep and make it configurable so that users can set an extra wait to ensure all ranks have dumped the debug info.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/116545
Approved by: https://github.com/wconstab
2024-01-26 16:48:00 -08:00
ab4df89eea [C10D] Rename flightrecorder key vars to avoid confusion (#116905)
Key vars are strings used as dict keys (e.g. duration_s was a string
"duration_ms")

_s confused me with time (seconds) since duration_s was a key string and
duration_ms is another variable holding a time value.

Now duration_key is "duration_ms".

Pull Request resolved: https://github.com/pytorch/pytorch/pull/116905
Approved by: https://github.com/zdevito
2024-01-26 16:48:00 -08:00
9d02ebe876 [c10d] To make ProcessGroupNCCL to use globalStore for coordination (#117075)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/117075
Approved by: https://github.com/wconstab
ghstack dependencies: #117074
2024-01-26 16:48:00 -08:00
b61e01cce9 [c10d] Add a recursive method to get the inner most store (#117074)
In c10d PG initialization, we wrap TCPStore with multiple layers of PrefixStore which adds layers of prefix.

One example is:
"default_pg/0//cuda//timeout_dump"
When initialized the default PG, because there is no store passed. We first add the prefix "default_pg" to the TCPStore returned from rendezvous:

bdeaaad70c/torch/distributed/distributed_c10d.py (L1240)

We then add pg_name (aka 0) bdeaaad70c/torch/distributed/distributed_c10d.py (L1376) and device (aka cuda) bdeaaad70c/torch/distributed/distributed_c10d.py (L1387)

to the prefix. Then when we call store_->set("timeout_dump"). The actual key used for writing into TCPStore is "default_pg/0//cuda//timeout_dump".

For sub-PG, things get even interesting, we put the store wrapped with default pg name to a cache:
bdeaaad70c/torch/distributed/distributed_c10d.py (L1517)

And when creating each subPG, it is append its PG name right after the cached store. The example keys are:
'default_pg/0//10//cuda//timeout_dump', 'default_pg/0//12//cuda//timeout_dump', 'default_pg/0//38//cuda//timeout_dump', 'default_pg/0//39//cuda//timeout_dump'. (10, 12, 38 and 39 are all PG names of each subPG created)

The reason why the number in the name is bumped up so high is because for each subPG creation, all ranks have to call the API together and the global variable used for PG name will be bumped up monolithically:

bdeaaad70c/torch/distributed/distributed_c10d.py (L3666)

Similar things happen for using hashing for PG names.

This has a potential issue, because each sub-PG has an instance of ProcessGroupNCCL, and if we want to set something global to notify all sub-PGs (and all ranks). This added prefix causes bugs. For example, if on sub-PG 1, we set a value to TCPStore with key ('default_pg/0//1//cuda//timeout_dump'), while we use the default PG instances to check the TCPStore, which are using the key ('default_pg/0//cuda//timeout_dump'), default PG instances will never get the notified signals. So in this PR, we added a new API in PrefixStore which we get the innermost non-PrefixStore for set and check. The next PR will make changes in NCCL watchdog.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/117074
Approved by: https://github.com/wconstab, https://github.com/H-Huang
2024-01-26 16:48:00 -08:00
f7ce61ba53 [C10D] Dump cpp stacktraces on heartbeat monitor timeout (#116717)
Summary:
If heartbeat monitor times out and kills the process, we want to know why.

It's convenient to use an internal tool for this, but we plan to later
integrate with torchelastic to call into pyspy or something else, which will be
both better (including py stacks) and compatible with OSS.

Test Plan: tested manually, observed c++ stacktraces were dumped

Reviewed By: fduwjj

Differential Revision: D52370243

Pull Request resolved: https://github.com/pytorch/pytorch/pull/116717
Approved by: https://github.com/zdevito
2024-01-26 16:48:00 -08:00
e7bae15ab1 [C10D] Make heartbeat_ atomic (#116702)
Summary:
Currently, the code is working. We know this becuase we observe heartbeat
timeouts.

However, there is a chance that if the code were refactored, the compiler could
optimize away the load of heartbeat_ inside heartbeatMonitor, and we wouldn't
know.

Using atomic here is not really for thread synchronization, but more to ensure
compiler optimizations (hoisting the read outside the loop) can never be
allowed to happen.  Again, we know this isn't currently happening bc if it
were, it  would not be an intermittent failure, it would be an always failure.
(at least with a fixed compiler/platform).

I previously avoided atomic bc we didn't want shared locks between heartbeat
monitor and watchdog thread.  Why? if watchdog held the lock and hung, monitor
could also hang.  However, this really can't happen (Afaik) when using an
atomic.

Test Plan: existing CI tests

Differential Revision: D52378257

Pull Request resolved: https://github.com/pytorch/pytorch/pull/116702
Approved by: https://github.com/fduwjj, https://github.com/zdevito
2024-01-26 16:48:00 -08:00
e71b422908 [C10D] Improve Heartbeat Monitor exit logs (#116268) (#116661)
Summary:

- add workMetaList_.size() so we know how many outstanding works there
  were when killing
- Print our first log before debuginfo dump instead of after, since it
  is clearer when reading the logs that we time out and then dump
- Organize the log strings- put them near where they are used

cc mrshenli pritamdamania87 zhaojuanmao satgera rohan-varma gqchen aazzolini osalpekar jiayisuse H-Huang kwen2501 awgu penguinwu fegin XilunWu wanchaol fduwjj wz337 tianyu-l yf225

imported-using-ghimport

Test Plan: Imported from OSS

Reviewed By: fduwjj

Differential Revision: D52369167

Pulled By: wconstab

Pull Request resolved: https://github.com/pytorch/pytorch/pull/116661
Approved by: https://github.com/fduwjj
2024-01-26 16:48:00 -08:00
389940ce60 [c10d] Make DebugInfoWriter Singleton across all PG objects (#116489)
Previously, we have the writer register to each NCCL PG(backend), so for every pg, we have a NCCL PG instance, so if we use some customized writer when multiple sub-PGs are used, we need to ensure user to register the writer for every backend which indicates a bad UX. Furthermore, the debug info is global, so it does not make sense to have the writer for each instance. We even have a static mutex in the `dumpDebuggingInfo` to ensure we serialize the write, that makes it more obvious that we can make the writer a singleton so that we only have one writer instance for all PG instances.

Although the rationale is clear, the implementation may vary a lot. So this PR is RFC for now to see if this implementation makes sense or not.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/116489
Approved by: https://github.com/kwen2501
2024-01-26 16:48:00 -08:00
b2237a7c85 [C10d] Fix Log Prefix in NCCLPG so that each instance gets its own prefix (#116520)
Somehow the logprefix only have ProcessGroup 0 rank [global rank]. This does not give the expected result as per the comment says "a prefix that is unique to this process group and rank". So this PR fix it and make it different for different subPGs.

The reason is that we set the prefix static which is shared across all NCCLPG instances and whoever calls this function first will set `rank_` and `uid_` to the prefix. We always initialize PG 0 first that's why we always see PG[0] + global ranks for all subPGs.

<img width="484" alt="image" src="https://github.com/pytorch/pytorch/assets/6937752/7fbb0226-7e25-4306-9cee-22e17b00bc8e">

Pull Request resolved: https://github.com/pytorch/pytorch/pull/116520
Approved by: https://github.com/wconstab
ghstack dependencies: #116218
2024-01-26 16:48:00 -08:00
ef5dfe3f3e [c10d] Fix timeout dump path write path overlap when there are multiple PGs (#116218)
Basically we observed that if there are multiple PGs and if the timeout happens on one of the subPG, we somehow use the local rank in the dump file. We realize that:
1. For setting the timeout signal in the store, any watchdog thread from any PG can do that.
2. For checking and dump, only the watchdog thread of default PG which we will always create and contain all ranks (no file name conflict) is needed here because the store signal and dump debug info are all global.
3. Since dump is global, we want to avoid the case when ranks from sub-PG pollute logs from global ranks (local rank 0 vs global rank 0). So that we use global ranks here to initialize debug info writer. (Down the road, we are thinking about making it a singleton so that user only register it once for multi-PG case.)

Pull Request resolved: https://github.com/pytorch/pytorch/pull/116218
Approved by: https://github.com/wconstab
2024-01-26 16:48:00 -08:00
e303dc3c08 [c10d] Add stream info during nccl comm abort call (#116076)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/116076
Approved by: https://github.com/XilunWu
2024-01-26 16:48:00 -08:00
265efad2de [C10D] Increase TORCH_NCCL_HEARTBEAT_TIMEOUT_SEC (#116267)
Change default from 2 min to 10 min.

Why? Many cases of heartbeat timeout were reported, but increasing
timeout led to the same job hanging in a different place, suggesting
heartbeat kill was working well and not a false positive.  However, some
others reported jobs running fine with increased timeouts.  One such
case was investigated below, and suggests that indeed a 2 min timeout is
too aggressive.  While we have not fully root caused the issue, it
is better to avoid killing jobs that would otherwise complete.

Current theory is that watchdog is not totally deadlocked, but is slowed
down in its processing of work objs due to some intermittent resource
contention.  Hence, allowing more time is more of a workaround than a
fix.

Debug/Analysis:
https://docs.google.com/document/d/1NMNWoTB86ZpP9bqYLZ_EVA9byOlEfxw0wynMVEMlXwM

Differential Revision: [D52368791](https://our.internmc.facebook.com/intern/diff/D52368791)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/116267
Approved by: https://github.com/fduwjj
2024-01-26 16:48:00 -08:00
60f0455905 [C10D] Make all PGNCCL LOG usages use logPrefix() (#116060)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/116060
Approved by: https://github.com/fduwjj
ghstack dependencies: #116059
2024-01-26 16:48:00 -08:00
4898313791 [C10D] Add logPrefix to abortCommsFromMap (#116059)
Prints additional info such as PG ID/Rank.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/116059
Approved by: https://github.com/fduwjj
2024-01-26 16:48:00 -08:00
f4da9adf6b [C10D] Add waitForDumpOrTimeout to log on dump abandonment (#115876)
Helps call attention to any cases where the dump actually times out.

The timeout is likely to hit if we run into slow stacktrace processing.

Log any exceptions encountered in the background thread, but don't raise
them- we're already willing to abandon the debug dump, and want to
proceed with our normal execution (in the case of dumppipe) or shutdown
process (when dumping happens on timeout and shutdown is already
initiated).

Pull Request resolved: https://github.com/pytorch/pytorch/pull/115876
Approved by: https://github.com/zdevito
ghstack dependencies: #115807
2024-01-26 16:48:00 -08:00
8f7f35273e [c10d] Polish NCCL PG monitor thread log message (#115888)
We turned on monitor thread by default in https://github.com/pytorch/pytorch/pull/112518, and we want the error message that is displayed when the monitor kills the process to be more informative.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/115888
Approved by: https://github.com/wconstab
2024-01-26 16:48:00 -08:00
44ec9612ed [C10D] Log PG size in init log (#115807)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/115807
Approved by: https://github.com/XilunWu
2024-01-26 16:48:00 -08:00
4d3bea2b29 [nccl flight recorder] nullptr profiling name (#115851)
Sometimes profiling name can be a nullptr, which
throws on conversion to std::string. This adds a check.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/115851
Approved by: https://github.com/wconstab
2024-01-26 16:48:00 -08:00
0bcdddc3c1 [C10D] Make dumpDebuggingInfo share a mutex across PGs (#115803)
The mutex was originally added to avoid racing to dump debuginfo,
where a race in this case would result in a corrupted dump file.

The reason a mutex helps is that it forces all dump requests to be
serialized, so that an observer would either see an in-progress file, a
complete file, or no file.  Without a mutex, a fourth state is possible
(a file that has been written to by multiple threads and is invalid).

Becuase the mutex was a ProcessGroupNCCL class member, and each PG
instance has its own watchdog thread that can launch a dump, it was not
doing its job.  Making the mutex static shares it between instances of
the class and ensures serialization of dumps triggered by any PG.

(Note: dumps triggered by different PGs have the same, global contents
anyway- there is only one global flight recorder, so it doesn't matter
who triggers it.)

Pull Request resolved: https://github.com/pytorch/pytorch/pull/115803
Approved by: https://github.com/kwen2501
ghstack dependencies: #115771, #115798, #115800, #115801
2024-01-26 16:48:00 -08:00
28b6220312 [C10D] Change PGNCCL logs to prefix [PG {} Rank {}] (#115801)
Adds a PG {process group uid} prefix component to logs.

This is helpful in situations where there are multiple processgroups,
and rank information by itself is confusing.  (For example rank0 on PG1
may correspond to rank3 on PG0.  People may assume 'rank0' references
the global (PG0) world, but it may reference a sub-pg.  Prefacing the PG
helps clarify this.

Does NOT change logs from inside WorkNCCL functions, since WorkNCCL
doens't know what PG ID it corresponds to. Will address these logs
separately.

Example:

```
[I ProcessGroupNCCL.cpp:787] [PG 0 Rank 0] ProcessGroupNCCL initialization ...
```

Pull Request resolved: https://github.com/pytorch/pytorch/pull/115801
Approved by: https://github.com/fduwjj
ghstack dependencies: #115771, #115798, #115800
2024-01-26 16:48:00 -08:00
210b7b65e2 [C10D] Refactor NCCL logs to use common prefix helper (#115800)
Put the repeated code that string formats [Rank {rank}] in one place.

Sets up for the next PR that also adds more info to this prefix.

(Does not change exception messages, which could be done as well.
Exception messages are not formatted quite the same way. Tries
instead to keep from changing log behavior (in this PR) and only
refactor code.

Did limited testing (some logs were observed OK).

Pull Request resolved: https://github.com/pytorch/pytorch/pull/115800
Approved by: https://github.com/fduwjj
ghstack dependencies: #115771, #115798
2024-01-26 16:48:00 -08:00
4da10b5cd3 [C10D] Only open NCCL dump pipe file once per process (#115798)
The NCCL flight recorder is per-process (it is shared by all
processgroups), but individual process groups used to construct their
own pipe for being signaled to dump the flight recorder.

This ensures that only one pipe per process is created, by only creating
the pipe on the first ProcessGroup (uid_ == 0) which should be the world
group.

Filenames are still keyed off of rank, but this should now be global
rank instead of sub-pg rank, making the filenames unique across the
whole trainer process.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/115798
Approved by: https://github.com/zdevito
ghstack dependencies: #115771
2024-01-26 16:48:00 -08:00
f09763814f [C10D] Make DumpPipe disabled when FlightRecorder disabled (#115771)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/115771
Approved by: https://github.com/fduwjj
2024-01-26 16:48:00 -08:00
80923ed5a6 [C10D] Make DumpPipe pipe file configurable (#115770)
Add TORCH_NCCL_DEBUG_INFO_PIPE_FILE env, allowing separate pipe file
location from dump file location.

Defaults PIPE_FILE to empty, meaning disabled.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/115770
Approved by: https://github.com/zdevito
2024-01-26 16:48:00 -08:00
20 changed files with 1713 additions and 354 deletions

View File

@ -151,6 +151,10 @@ struct TORCH_CUDA_CPP_API CUDAEvent {
TORCH_CHECK(is_created_ && other.isCreated(),
"Both events must be recorded before calculating elapsed time.");
float time_ms = 0;
// We do not strictly have to set the device index to the same as our event,
// but if we don't and the current device is not initialized, it will
// create a new cuda context, which will consume a lot of memory.
CUDAGuard guard(device_index_);
// raise cudaErrorNotReady if either event is recorded but not yet completed
AT_CUDA_CHECK(cudaEventElapsedTime(&time_ms, event_, other.event_));
return time_ms;

View File

@ -1732,7 +1732,7 @@ if(BUILD_TEST)
foreach(test_src ${Caffe2_CPU_TEST_SRCS})
get_filename_component(test_name ${test_src} NAME_WE)
add_executable(${test_name} "${test_src}")
target_link_libraries(${test_name} torch_library gtest_main)
target_link_libraries(${test_name} torch_library gtest_main stdc++)
target_include_directories(${test_name} PRIVATE $<INSTALL_INTERFACE:include>)
target_include_directories(${test_name} PRIVATE $<BUILD_INTERFACE:${CMAKE_BINARY_DIR}/include>)
target_include_directories(${test_name} PRIVATE ${Caffe2_CPU_INCLUDE})

View File

@ -371,7 +371,8 @@ std::string readTraceFromFile(const std::string& filename, size_t size) {
// Extend the nested class outside the parent class
class TestDebugInfoWriter : public c10d::DebugInfoWriter {
public:
TestDebugInfoWriter() : DebugInfoWriter(0) {}
TestDebugInfoWriter(std::string namePrefix)
: DebugInfoWriter(namePrefix, 0) {}
void write(const std::string& ncclTrace) override {
traces_.assign(ncclTrace.begin(), ncclTrace.end());
@ -415,10 +416,12 @@ TEST_F(ProcessGroupNCCLErrorsTest, testNCCLErrorsNoHeartbeat) {
// The storer here is very similar to the fallback storer.
// The only difference is that we are storing traces also in memory for
// validation.
std::string fileNamePrefix = c10d::getCvarString(
{"TORCH_NCCL_DEBUG_INFO_TEMP_FILE"}, "/tmp/nccl_trace_rank_");
std::unique_ptr<TestDebugInfoWriter> wrterForTestPtr =
std::make_unique<TestDebugInfoWriter>();
std::make_unique<TestDebugInfoWriter>(fileNamePrefix);
std::vector<uint8_t>& traces = wrterForTestPtr->getTraces();
pg.registerDebugInfoWriter(std::move(wrterForTestPtr));
c10d::DebugInfoWriter::registerWriter(std::move(wrterForTestPtr));
// Normal collective case.
auto work = pg.allreduce(tensors_);
@ -449,6 +452,9 @@ TEST_F(ProcessGroupNCCLErrorsTest, testNCCLErrorsNoHeartbeat) {
class ProcessGroupNCCLWatchdogTimeoutTest : public ProcessGroupNCCLErrorsTest {
protected:
void SetUp() override {
// TODO (kwen2501)
GTEST_SKIP() << "Skipping tests under ProcessGroupNCCLWatchdogTimeoutTest; "
<< "will rewrite them after refactoring Work queues.";
ProcessGroupNCCLErrorsTest::SetUp();
std::string timeInterval = std::to_string(heartBeatIntervalInSec);
ASSERT_TRUE(setenv(c10d::TORCH_NCCL_BLOCKING_WAIT[0].c_str(), "1", 1) == 0);

View File

@ -4,9 +4,10 @@ import torch
import torch.distributed as dist
import torch.distributed._functional_collectives as funcol
from torch.distributed._tensor import DTensor
from torch.distributed._tensor.placement_types import Shard
from torch.distributed.checkpoint._state_dict_utils import (
from torch.distributed._state_dict_utils import (
_check_state_dict_similarity,
_copy_state_dict,
_create_cpu_state_dict,
_gather_state_dict,
_offload_state_dict_to_cpu,
)
@ -115,6 +116,58 @@ class TestStateDictUtils(DTensorTestBase):
}
self.assertEqual(state_dict, _gather_state_dict(dist_state_dict))
@skip_if_lt_x_gpu(2)
def test_create_cpu_state_dict(self):
device = torch.device("cuda")
buffer = io.BytesIO()
torch.save(torch.ones(10), buffer)
buffer.seek(0)
state_dict = {
"tensor1": torch.arange(10, device=device),
"tensor2": torch.ones(10, device=device),
"non_tensor_bytes_io": copy.deepcopy(buffer),
"non_tensor_bytes": buffer.read(),
"step": torch.tensor(7, dtype=torch.float),
"lr": 1.5,
"nested": {"list": [1, 2, 3, 4]},
}
def _verify(cpu_state_dict):
# Verify the correctness of _check_state_dict_similarity()
self.assertTrue(_check_state_dict_similarity(state_dict, cpu_state_dict))
tensor1 = cpu_state_dict["tensor1"]
cpu_state_dict["tensor1"] = torch.arange(11)
self.assertFalse(_check_state_dict_similarity(state_dict, cpu_state_dict))
cpu_state_dict["tensor1"] = tensor1
_copy_state_dict(state_dict, cpu_state_dict)
# Verify if _copy_state_dict works
for v in cpu_state_dict.values():
if isinstance(v, torch.Tensor):
self.assertFalse(v.is_cuda)
self.assertEqual(cpu_state_dict["tensor1"], torch.arange(10))
self.assertEqual(cpu_state_dict["tensor2"], torch.ones(10))
buffer.seek(0)
cpu_state_dict["non_tensor_bytes_io"].seek(0)
self.assertEqual(
cpu_state_dict["non_tensor_bytes_io"].read(), buffer.read()
)
buffer.seek(0)
self.assertEqual(cpu_state_dict["non_tensor_bytes"], buffer.read())
self.assertEqual(cpu_state_dict["lr"], 1.5)
self.assertEqual(cpu_state_dict["step"], 7)
self.assertEqual(cpu_state_dict["nested"], {"list": [1, 2, 3, 4]})
cpu_state_dict = _create_cpu_state_dict(state_dict, pin_memory=True)
_verify(cpu_state_dict)
cpu_state_dict = _create_cpu_state_dict(state_dict, share_memory=True)
_verify(cpu_state_dict)
cpu_state_dict = _create_cpu_state_dict(
state_dict, share_memory=True, pin_memory=True
)
_verify(cpu_state_dict)
if __name__ == "__main__":
run_tests()

View File

@ -11,6 +11,7 @@ import tempfile
import threading
import pickle
import time
import json
import warnings
from contextlib import contextmanager
from datetime import datetime, timedelta
@ -1334,6 +1335,19 @@ class ProcessGroupNCCLTest(MultiProcessTestCase):
self.assertEqual(tensor, original_tensor)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_set_process_group_desc(self):
store = c10d.FileStore(self.file_name, self.world_size)
device = torch.device(f'cuda:{self.rank}')
pg_default = self._create_process_group_nccl(store, self.opts(), device_id=device)
self.assertEqual(pg_default.group_desc, "default_pg")
pg_1 = c10d.new_group([0, 1], group_desc="test_purpose")
self.assertEqual(pg_1.group_desc, "test_purpose")
pg_2 = c10d.new_group([0, 1])
self.assertEqual(pg_2.group_desc, "undefined")
class DistributedDataParallelTest(
test_c10d_common.CommonDistributedDataParallelTest, MultiProcessTestCase
):
@ -3542,11 +3556,12 @@ class SparseCollective(MultiProcessTestCase):
class NCCLTraceTestBase(MultiProcessTestCase):
def setUp(self):
super().setUp()
os.environ["TORCH_NCCL_ENABLE_TIMING"] = '0'
os.environ["TORCH_NCCL_ENABLE_TIMING"] = '0' # see 'timing_enabled' parametrized tests
os.environ["TORCH_NCCL_TRACE_BUFFER_SIZE"] = '10'
os.environ["TORCH_NCCL_DUMP_ON_TIMEOUT"] = '1'
self.tempdir = tempfile.TemporaryDirectory()
os.environ["TORCH_NCCL_DEBUG_INFO_TEMP_FILE"] = self._trace_basename()
os.environ["TORCH_NCCL_DEBUG_INFO_PIPE_FILE"] = self._trace_basename()
self._spawn_processes()
@classmethod
@ -3617,28 +3632,50 @@ class NCCLTraceTest(NCCLTraceTestBase):
@requires_nccl()
@skip_but_pass_in_sandcastle_if(torch.cuda.device_count() < 2, "NCCL test requires 2+ GPUs")
def test_short(self):
@parametrize("timing_enabled", [True, False])
def test_short(self, timing_enabled):
if self.rank == self.MAIN_PROCESS_RANK:
return
pg = self._create_process_group_nccl()
if timing_enabled:
pg._enable_collectives_timing()
device = self.local_device
a = torch.full((3, 4), float(self.rank), device=device)
for i in range(2):
f = pg.allreduce(a)
f.wait()
torch.cuda.synchronize(device=device)
# gah ok so now the duration_ms is populated best-effort since it can only happen outside "dump()" api
time.sleep(1)
t = pickle.loads(torch._C._distributed_c10d._dump_nccl_trace())
ver = t['version']
self.assertEqual(ver, "1.1")
t = t['entries']
self.assertEqual(len(t), 2)
last = t[-1]
self.assertEqual(last['process_group'], ('0', 'default_pg'))
self.assertEqual(last['state'], 'completed')
s = last['time_discovered_started_ns']
f = last['time_discovered_completed_ns']
self.assertIsNotNone(f)
if timing_enabled:
self.assertIsNotNone(s)
self.assertTrue(s <= f)
self.assertIn('test_c10d_nccl.py', str(last['frames']))
self.assertEqual(last['input_sizes'], ((3, 4),))
self.assertEqual(last['output_sizes'], ((3, 4),))
self.assertEqual(last['seq_id'], 2)
now = datetime.now()
event_created_time = datetime.fromtimestamp(last['time_created_us'] / 1000000)
event_created_time = datetime.fromtimestamp(last['time_created_ns'] / 1000000000)
before_test = now - timedelta(minutes=1)
self.assertTrue(before_test < event_created_time < now)
if timing_enabled:
# very loose bounds, measured 0.036 ms on devgpu
self.assertTrue(0 < last['duration_ms'] < 100)
else:
self.assertTrue("duration_ms" not in last)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(torch.cuda.device_count() < 2, "NCCL test requires 2+ GPUs")
@ -3698,9 +3735,11 @@ class NCCLTraceTest(NCCLTraceTestBase):
f.wait()
torch.cuda.synchronize(device=device)
t = pickle.loads(torch._C._distributed_c10d._dump_nccl_trace())
t = t['entries']
self.assertEqual(len(t), 10)
first = t[0]
last = t[-1]
self.assertEqual(last['profiling_name'], 'nccl:all_reduce')
self.assertEqual(last['state'], 'completed')
self.assertIn('test_c10d_nccl.py', str(last['frames']))
self.assertEqual(last['input_sizes'], ((3, 4),))
@ -3732,6 +3771,8 @@ class NCCLTraceTest(NCCLTraceTestBase):
pg.allreduce(a).wait()
e.synchronize()
t = pickle.loads(torch._C._distributed_c10d._dump_nccl_trace())
t = t['entries']
self.assertEqual(t[-1]['profiling_name'], 'nccl:all_reduce')
if self.rank == 0:
self.assertEqual(t[-1]['seq_id'], 1)
self.assertEqual(t[-1]['state'], 'completed')
@ -3773,12 +3814,15 @@ class NCCLTraceTest(NCCLTraceTestBase):
# give the other thread some time to fill the cuda buffer
time.sleep(5)
t = pickle.loads(torch._C._distributed_c10d._dump_nccl_trace())
t = t['entries']
self.assertEqual(t[-1]['profiling_name'], 'nccl:all_reduce')
if self.rank == 0:
self.assertEqual(t[-1]['seq_id'], 1)
self.assertEqual(t[-1]['state'], 'completed')
else:
self.assertEqual(t[-1]['seq_id'], 2)
self.assertEqual(t[-1]['state'], self.started_or_scheduled(timing_enabled))
self.assertIsNone(t[-1]['time_discovered_completed_ns'])
# this will eventually cause the missing rank 0
# to continue which will unblock the non-zero ranks
self.parent.send('next')
@ -3832,6 +3876,10 @@ class NCCLTraceTestDumpOnTimeout(NCCLTraceTestDumpOnTimeoutBase):
@skip_but_pass_in_sandcastle_if(torch.cuda.device_count() < 2, "NCCL test requires 2+ GPUs")
@parametrize("timing_enabled", [True, False])
def test_timeout_dumps(self, timing_enabled):
# dump on heartbeatmonitor thread
os.environ['TORCH_NCCL_COORD_CHECK_MILSEC'] = '1000'
# need rank0 to crash before looking for its output file
os.environ['TORCH_NCCL_HEARTBEAT_TIMEOUT_SEC'] = '1'
if self.rank == self.MAIN_PROCESS_RANK:
# wait for rank0 to crash before looking for its output file
@ -3839,6 +3887,7 @@ class NCCLTraceTestDumpOnTimeout(NCCLTraceTestDumpOnTimeoutBase):
self.assertEqual(self._wait_process(0, timeout=90), -6)
with open(self._trace_name(rank=0), 'rb') as f:
t = pickle.load(f)
t = t['entries']
self.assertEqual(len(t), 2)
self.assertEqual(t[0]['seq_id'], 1)
self.assertEqual(t[0]['state'], 'completed')
@ -3868,7 +3917,7 @@ class NCCLTraceTestDumpOnTimeout(NCCLTraceTestDumpOnTimeoutBase):
instantiate_parametrized_tests(NCCLTraceTestDumpOnTimeout)
instantiate_parametrized_tests(NCCLTraceTest)
class NCCLTraceTestTimeoutDumpOnIdleRanks(NCCLTraceTestDumpOnTimeoutBase):
class NCCLTraceTestTimeoutDumpOnStuckRanks(NCCLTraceTestDumpOnTimeoutBase):
def _check_return_codes(self, elapsed_time):
# the base test infra assumes processes exit with matching return codes,
# but we want rank0 to abort and rank1 to exit cleanly in this test
@ -3877,7 +3926,7 @@ class NCCLTraceTestTimeoutDumpOnIdleRanks(NCCLTraceTestDumpOnTimeoutBase):
@requires_nccl()
@skip_but_pass_in_sandcastle_if(torch.cuda.device_count() < 2, "NCCL test requires 2+ GPUs")
def test_timeout_dumps_on_idle_ranks(self):
def test_timeout_dumps_on_stuck_ranks(self):
if self.rank == self.MAIN_PROCESS_RANK:
# wait for both rank0 and 1 to crash before looking for both ranks' output
@ -3888,20 +3937,17 @@ class NCCLTraceTestTimeoutDumpOnIdleRanks(NCCLTraceTestDumpOnTimeoutBase):
self.assertTrue(os.path.exists(self._trace_name(rank=0)))
with open(self._trace_name(rank=0), 'rb') as f:
t = pickle.load(f)
t = t['entries']
self.assertEqual(len(t), 2)
with open(self._trace_name(rank=1), 'rb') as f:
t = pickle.load(f)
t = t['entries']
self.assertEqual(len(t), 1)
self.assertEqual(t[0]['seq_id'], 1)
self.assertEqual(t[0]['state'], 'completed')
return
# Set heartbeat timeout to a shorter one (default timeout is 2 min).
os.environ[
"TORCH_NCCL_HEARTBEAT_TIMEOUT_SEC"
] = f"{NCCLTraceTestDumpOnTimeoutBase.timeout_sec * 2}"
pg = self._create_process_group_nccl()
device = self.local_device
with torch.cuda.device(device):
a = torch.full((3, 4), float(self.rank), device=device)
@ -3910,12 +3956,68 @@ class NCCLTraceTestTimeoutDumpOnIdleRanks(NCCLTraceTestDumpOnTimeoutBase):
if self.rank == 0:
pg.allreduce(a).wait()
# rank 0 will crash before it passes the sync, but rank1 will exit quickly and cleanly
# rank 0 will get stuck, timeout and then signal a timeout to all ranks.
torch.cuda.synchronize()
# Force rank 1 to idle so that it also gets debug info dump triggered.
if self.rank == 1:
time.sleep(6)
# Force rank 1 to idle so that it will eventually timeout as well after
# getting the global signal to dump the debugging info.
time.sleep(600)
class NcclErrorDumpTest(NCCLTraceTestBase):
def _wait_process(self, rank, timeout):
try:
self.processes[rank].join(timeout)
return self.processes[rank].exitcode
except TimeoutError:
return None
def _check_return_codes(self, elapsed_time):
# the base test infra assumes processes exit with matching return codes,
# but we want rank0 to abort with exception and rank1 to exit with exit 1
self.assertEqual(self.processes[0].exitcode, -6)
self.assertEqual(self.processes[1].exitcode, 1)
@requires_nccl()
@requires_nccl_version((2, 4, 0), "Need NCCL 2.4+ for error checking")
@skip_if_lt_x_gpu(2)
@skip_if_rocm
def test_nccl_errors_dump(self):
os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "1"
os.environ["TORCH_NCCL_TRACE_BUFFER_SIZE"] = '1000'
os.environ["TORCH_NCCL_DUMP_ON_TIMEOUT"] = '1'
# need rank0 to dump before abort
os.environ['TORCH_NCCL_HEARTBEAT_TIMEOUT_SEC'] = '5'
if self.rank == self.MAIN_PROCESS_RANK:
# wait for both rank0 and 1 to crash before looking for dump
self.assertEqual(self._wait_process(0, timeout=90), -6)
self.assertEqual(self._wait_process(1, timeout=90), 1)
# verify that the trace file exists for rank0
self.assertTrue(os.path.exists(self._trace_name(rank=0)))
return
store = c10d.FileStore(self.file_name, self.world_size)
process_group = c10d.ProcessGroupNCCL(
store,
self.rank,
self.world_size,
timeout=timedelta(seconds=10),
)
process_group.allreduce(torch.rand(10).cuda(self.rank))
if self.rank == 0:
work = process_group.allreduce(torch.rand(10).cuda(self.rank))
# expect an error to be raised
with self.assertRaisesRegex(dist.DistBackendError, ""):
# Block the current stream on the NCCL stream
work.wait()
# Run some GPU operations
a = torch.rand(10).cuda(self.rank)
elif self.rank == 1:
# Clean up structures (ex: files for FileStore before going down)
del process_group
sys.exit(1)
if __name__ == "__main__":
assert (

View File

@ -71,7 +71,7 @@ class StoreTestBase:
def _create_store(self, i):
raise RuntimeError("not implemented")
def _test_set_get(self, fs):
def _test_set_get_check(self, fs):
fs.add("key", 1)
fs.add("key", 2)
fs.add("key", 3)
@ -90,14 +90,16 @@ class StoreTestBase:
self.assertEqual(b"value1", fs.get("key1"))
self.assertEqual(b"value2", fs.get("key2"))
self.assertEqual(b"21", fs.get("key3"))
self.assertTrue(fs.check(["key3"]))
self.assertFalse(fs.check(["Randomkey3"]))
fs.set("-key3", "7")
self.assertEqual(b"7", fs.get("-key3"))
fs.delete_key("-key3")
self.assertEqual(fs.num_keys(), self.num_keys_total)
def test_set_get(self):
self._test_set_get(self._create_store())
def test_set_get_check(self):
self._test_set_get_check(self._create_store())
def _test_compare_set(self, store):
missing_key_result = store.compare_set("cs_key0", "wrong_old_value", "new_value0")
@ -441,6 +443,12 @@ class PrefixTCPStoreTest(TestCase, StoreTestBase):
def num_keys_total(self):
return 6
def test_underlying_non_prefix_store(self):
store = self._create_store()
wrapped_store = dist.PrefixStore(self.prefix, dist.PrefixStore(self.prefix, store))
self.assertEqual(self.tcpstore, store._underlying_non_prefix_store)
self.assertEqual(self.tcpstore, wrapped_store._underlying_non_prefix_store)
class MyPythonStore(dist.Store):
def __init__(self):
super().__init__()

View File

@ -463,6 +463,7 @@ class ProcessGroup:
backend: Optional[ProcessGroup],
) -> None: ...
def _set_group_name(self, name: str) -> None: ...
def _set_group_desc(self, desc: str) -> None: ...
def name(self) -> str: ...
def _has_hooks(self) -> bool: ...
def _wait_for_pending_works(self) -> None: ...
@ -471,6 +472,10 @@ class ProcessGroup:
def bound_device_id(self) -> Optional[torch.device]: ...
@bound_device_id.setter
def bound_device_id(self, device: Optional[torch.device]) -> None: ...
@property
def group_name(self) -> str: ...
@property
def group_desc(self) -> str: ...
class ProcessGroupRoundRobin(ProcessGroup): ...

View File

@ -369,6 +369,14 @@ class TORCH_API Backend : public torch::CustomClassHolder {
return pg_name_;
}
void setGroupDesc(const std::string& desc) {
pg_desc_ = desc;
}
const std::string& getGroupDesc() const {
return pg_desc_;
}
// See similar functions in ProcessGroup.hpp for context.
c10::optional<at::Device> getBoundDeviceId() const {
return bound_device_id_;
@ -399,6 +407,7 @@ class TORCH_API Backend : public torch::CustomClassHolder {
// remains the same across use of this process group.
DebugLevel dist_debug_level_;
std::string pg_name_;
std::string pg_desc_;
std::function<void(std::shared_ptr<WorkInfo>)> onCompletionHook_;

View File

@ -8,6 +8,7 @@
#include <memory>
#include <mutex>
#include <ATen/ATen.h>
#include <c10/util/Exception.h>
#include <c10/util/Optional.h>
#include <nccl.h>
@ -175,14 +176,29 @@ std::string getNcclErrorDetailStr(
c10::optional<std::string> processGroupFailureReason = c10::nullopt);
// Write NCCL debug info to local disk or any storage users define.
// There are some constrains we set for the debug info writer:
// 1. The writer should only be registered once.
// 2. Once registered, users cannot change it including un-register.
// 3. It is recommended to register the customized writer in the trainer setup,
// If users don't register before calling launchAsyncDebugDump, then users
// lose the chance to register (and the default writer will be
// auto-registered).
class TORCH_API DebugInfoWriter {
public:
DebugInfoWriter(int rank);
virtual ~DebugInfoWriter();
virtual void write(const std::string& ncclTrace);
static DebugInfoWriter& getWriter(int rank);
static void registerWriter(std::unique_ptr<DebugInfoWriter> writer);
protected:
DebugInfoWriter(std::string namePrefix, int rank) {
filename_ = c10::str(namePrefix, rank);
}
std::string filename_;
private:
static std::unique_ptr<DebugInfoWriter> writer_;
static std::atomic<bool> hasWriterRegistered_;
};
// RAII wrapper for NCCL communicator
@ -267,6 +283,18 @@ class NCCLComm {
}
#endif
#if defined(IS_NCCL_EXP) && defined(NCCL_COMM_DUMP)
std::unordered_map<std::string, std::string> ncclCommDump() {
std::unordered_map<std::string, std::string> dump;
if (isAborted()) {
LOG(INFO) << "Communicator was aborted before trying to dump its state.";
return dump;
}
C10D_NCCL_CHECK(::ncclCommDump(ncclComm_, dump), c10::nullopt);
return dump;
}
#endif
ncclUniqueId getNcclId() {
return ncclId_;
}
@ -322,6 +350,9 @@ class NCCLComm {
// Set true failure reason if provided by ProcessGroupNCCL (e.g. work
// timeout)
commFailureReason_ = commFailureReason;
LOG(INFO) << "Aborting ncclComm_ " << ncclComm_ << " with reason: "
<< (commFailureReason ? *commFailureReason
: "No abort reason provided.");
#ifndef NCCL_HAS_COMM_NONBLOCKING
C10D_NCCL_CHECK(::ncclCommAbort(ncclComm_), commFailureReason_);
#else
@ -421,6 +452,8 @@ class NCCLComm {
#endif
}
friend class ProcessGroupNCCL;
protected:
ncclComm_t ncclComm_;
// Unique nccl_id for this communicator.

View File

@ -108,4 +108,22 @@ c10::intrusive_ptr<Store> PrefixStore::getUnderlyingStore() {
return store_;
}
c10::intrusive_ptr<Store> PrefixStore::getUnderlyingNonPrefixStore() {
c10::intrusive_ptr<Store> store = store_;
while (store) {
// Attempt to dynamically cast to PrefixStore
PrefixStore* asPrefixStore = dynamic_cast<PrefixStore*>(store.get());
if (asPrefixStore) {
store = asPrefixStore->getUnderlyingStore();
} else {
break; // We've reached a non-PrefixStore
}
}
TORCH_CHECK(
store != nullptr, "Underlying Non-PrefixStore shouldn't be null.");
return store;
}
} // namespace c10d

View File

@ -53,6 +53,9 @@ class TORCH_API PrefixStore : public Store {
c10::intrusive_ptr<Store> getUnderlyingStore();
// Recursively to fetch the store before layers of wrapping with PrefixStore.
c10::intrusive_ptr<Store> getUnderlyingNonPrefixStore();
protected:
std::string prefix_;
c10::intrusive_ptr<Store> store_;

View File

@ -165,6 +165,18 @@ void ProcessGroup::setGroupName(const std::string& name) {
}
}
const std::string& ProcessGroup::getGroupDesc() const {
return pg_desc_;
}
void ProcessGroup::setGroupDesc(const std::string& name) {
pg_desc_ = name;
// Also set the group desc for all backends
for (auto& kv : deviceTypeToBackend_) {
kv.second->setGroupDesc(name);
}
}
void ProcessGroup::enableCollectivesTiming() {
for (auto& kv : deviceTypeToBackend_) {
kv.second->enableCollectivesTiming();

View File

@ -694,6 +694,8 @@ class TORCH_API ProcessGroup : public torch::CustomClassHolder {
const std::string& getGroupName() const;
void setGroupName(const std::string& name);
const std::string& getGroupDesc() const;
void setGroupDesc(const std::string& name);
void enableCollectivesTiming();
void release_resources() override;
@ -724,6 +726,7 @@ class TORCH_API ProcessGroup : public torch::CustomClassHolder {
const int size_;
const c10::intrusive_ptr<Options> options_;
const BackendType backendType_;
std::string pg_desc_;
// Debug level setting. It is parsed once when ProcessGroup is constructed and
// remains the same across use of this process group.

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,15 @@
#pragma once
#if defined(__linux__)
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#endif
#ifdef USE_C10D_NCCL
#include <atomic>
#include <chrono>
#include <future>
#include <iostream>
@ -12,6 +20,7 @@
#include <torch/csrc/distributed/c10d/Backend.hpp>
#include <torch/csrc/distributed/c10d/NCCLUtils.hpp>
#include <torch/csrc/distributed/c10d/PrefixStore.hpp>
#include <torch/csrc/distributed/c10d/Store.hpp>
#include <ATen/DynamicLibrary.h>
@ -26,52 +35,74 @@
#include <torch/custom_class.h>
namespace c10d {
// Environment variable which controls whether we perform a NCCL healt check
// Control whether we perform a NCCL health check or not
// which ensures communicators are healthy at the beginning of init.
static std::vector<std::string> TORCH_ENABLE_NCCL_HEALTH_CHECK = {
"TORCH_ENABLE_NCCL_HEALTH_CHECK",
"ENABLE_NCCL_HEALTH_CHECK"};
// Environment variable which controls whether or not wait() is blocking or
// non-blocking.
// Control whether or not wait() is blocking or non-blocking.
static std::vector<std::string> TORCH_NCCL_BLOCKING_WAIT = {
"TORCH_NCCL_BLOCKING_WAIT",
"NCCL_BLOCKING_WAIT"};
// Environment variable which controls whether or not we perform Async Error
// Handling with NCCL.
// Control whether or not we perform Async Error Handling with NCCL.
static std::vector<std::string> TORCH_NCCL_ASYNC_ERROR_HANDLING = {
"TORCH_NCCL_ASYNC_ERROR_HANDLING",
"NCCL_ASYNC_ERROR_HANDLING"};
// Environment Variable to control whether dumping debug info on watchdog
// Control whether dumping debug info on watchdog
// timeout is enabled. This variable must be set together with
// TORCH_NCCL_ENABLE_MONITORING=1 and TORCH_NCCL_TRACE_BUFFER_SIZE > 0.
static std::vector<std::string> TORCH_NCCL_DUMP_ON_TIMEOUT = {
"TORCH_NCCL_DUMP_ON_TIMEOUT"};
// Environment Variable to control whether Desync Debug is enabled.
// This variable must be set together with TORCH_NCCL_ASYNC_ERROR_HANDLING.
// Control whether Desync Debug is enabled. This variable must be set
// together with TORCH_NCCL_ASYNC_ERROR_HANDLING.
static std::vector<std::string> TORCH_NCCL_DESYNC_DEBUG = {
"TORCH_NCCL_DESYNC_DEBUG",
"NCCL_DESYNC_DEBUG"};
// Enable recording start-events for all ProcessGroupNCCL collectives, and
// compute accurate collective timing per-collective. (Note: end-events are
// recorded by default. Turn on this flag can increase chances of a watchdog
// hang due to performing a CUDA event query which eventually calls
// cudaEventElapsedTime() API.
static std::vector<std::string> TORCH_NCCL_ENABLE_TIMING = {
"TORCH_NCCL_ENABLE_TIMING",
"NCCL_ENABLE_TIMING"};
// Enable monitoring thread which aborts the process when the ProcessGroupNCCL
// Watchdog thread gets stuck and no heartbeat is detected after
// TORCH_NCCL_HEARTBEAT_TIMEOUT_SEC. This can happen due to calling CUDA/NCCL
// APIs that may hang. It is Useful to prevent jobs being stuck for a prolonged
// time than necessary tying up cluster resources.
static std::vector<std::string> TORCH_NCCL_ENABLE_MONITORING = {
"TORCH_NCCL_ENABLE_MONITORING"};
// Control the watchdog heartbeat timeout period after which the monitoring
// thread will abort the process.
static std::vector<std::string> TORCH_NCCL_HEARTBEAT_TIMEOUT_SEC = {
"TORCH_NCCL_HEARTBEAT_TIMEOUT_SEC"};
// The maximum number of events we store in the flight recorder's ring buffer.
// (One event could be the start or end of a collective, for example).
static std::vector<std::string> TORCH_NCCL_TRACE_BUFFER_SIZE = {
"TORCH_NCCL_TRACE_BUFFER_SIZE"};
// Control how much extra time we will wait for dumping the debugging info
// before we exit and throws timeout exception.
static std::vector<std::string> TORCH_NCCL_WAIT_TIMEOUT_DUMP_MILSEC = {
"TORCH_NCCL_WAIT_TIMEOUT_DUMP_MILSEC"};
// Control the interval inside the watchdog thread to check the coordinated
// signal from other ranks, e.g. to dump the debugging information.
static std::vector<std::string> TORCH_NCCL_COORD_CHECK_MILSEC = {
"TORCH_NCCL_COORD_CHECK_MILSEC"};
constexpr const char* NCCL_BACKEND_NAME = "nccl";
constexpr const char* TIMEOUT_DUMP = "timeout_dump";
constexpr const char* EXCEPTION_DUMP = "exception_dump";
constexpr auto kProcessGroupNCCLDefaultTimeout =
std::chrono::milliseconds(10 * 60 * 1000);
@ -110,6 +141,59 @@ static std::vector<std::string> TORCH_NCCL_USE_TENSOR_REGISTER_ALLOCATOR_HOOK =
{"TORCH_NCCL_USE_TENSOR_REGISTER_ALLOCATOR_HOOK",
"NCCL_USE_TENSOR_REGISTER_ALLOCATOR_HOOK"};
#if defined(__linux__)
struct DumpPipe {
DumpPipe(int rank) {
std::string fileStem =
getCvarString({"TORCH_NCCL_DEBUG_INFO_PIPE_FILE"}, "");
if (fileStem.empty() ||
getCvarInt({"TORCH_NCCL_TRACE_BUFFER_SIZE"}, 0) <= 0) {
return;
}
TORCH_CHECK(!fileStem.empty(), "TORCH_NCCL_DEBUG_INFO_TEMP_FILE is empty");
std::string filename = c10::str(fileStem, rank, ".pipe");
TORCH_CHECK(
unlink(filename.c_str()) != -1 || errno == ENOENT,
"Error removing existing named pipe ",
filename);
TORCH_CHECK(
mkfifo(filename.c_str(), 0666) != -1,
"Error creating named pipe ",
filename);
fd_ = open(filename.c_str(), O_RDONLY | O_NONBLOCK);
LOG(INFO) << "Pipe file " << filename
<< " has been opened, write to it to trigger NCCL Debug Dump.";
TORCH_CHECK(fd_ != -1, "Error opening named pipe ", filename);
}
bool shouldDump() {
if (fd_ == -1) {
return false;
}
char buf[128];
// non-blocking from O_NONBLOCK above.
// Ignore EINTR because we already will poll this
// again later.
ssize_t bytesRead = read(fd_, &buf, 128);
return bytesRead > 0;
}
~DumpPipe() {
if (fd_ != -1) {
close(fd_);
}
}
private:
int fd_ = -1;
};
#else
struct DumpPipe {
DumpPipe(int rank) {}
bool shouldDump() {
return false;
}
};
#endif
// ProcessGroupNCCL implements NCCL bindings for c10d.
//
// All functions of the class are expected to be called in the same order
@ -205,6 +289,8 @@ class TORCH_API ProcessGroupNCCL : public Backend {
uint64_t getSequencenumber() const override;
const std::string& logPrefix() const;
// Helper function that sets an exception_ptr on the WorkNCCL object.
void setException(std::exception_ptr exception_ptr);
@ -358,6 +444,7 @@ class TORCH_API ProcessGroupNCCL : public Backend {
// via `ncclCommSplit`
std::shared_ptr<ProcessGroupNCCL> split_from;
int64_t split_color{0};
std::string group_name;
};
// If you wish to create multiple process groups, each with a potentially
@ -540,8 +627,11 @@ class TORCH_API ProcessGroupNCCL : public Backend {
void enableCollectivesTiming() override;
// Provide an API for users to define their own ways to store NCCL debug info.
void registerDebugInfoWriter(std::unique_ptr<DebugInfoWriter> writer);
// Helper function for iteratively aborting communicators in the provided map
void abortCommsFromMap(
std::unordered_map<std::string, std::vector<std::shared_ptr<NCCLComm>>>&
ncclCommsMap,
c10::optional<std::string> abortReason);
// Provides an API to abort the ProcessGroup (similar to ncclCommAbort)
// instead of relying on ProcessGroupNCCL destructor.
@ -694,6 +784,19 @@ class TORCH_API ProcessGroupNCCL : public Backend {
// Desync debug helper
void logWorkEnd(WorkNCCL& work);
// Generates a prefix that is unique to this process group and rank, for
// disambiguating logs
std::string createLogPrefix() const;
// Returns the unique prefix created in createLogPrefix
const std::string& logPrefix() const;
// Returns the global rank of the device. This function assumes that users
// always create a default global process group(PG) which includes all
// devices. It is called in the constructor of ProcessGroupNCCL, so it always
// return the rank_ of the the very first PG created, aka, default global PG.
const int& globalRank() const;
protected:
// Function that runs as part of a separate thread aside from watchdog
// thread because we need to check the heartbeat from watchdog thread
@ -712,6 +815,19 @@ class TORCH_API ProcessGroupNCCL : public Backend {
// for dump completion.
std::future<bool> launchAsyncDebugDump();
// Helper to wait up to the specified timeout and then abandon the dump.
// Logs on timeout, and asserts the future's status is as expected.
void waitForDumpOrTimeout(
std::future<bool>& fut,
const std::chrono::time_point<std::chrono::steady_clock>& wakeUpTime,
size_t timeout_sec = 30);
// A helper function to wait for a future to complete or timeout.
void waitForFutureOrTimeout(
std::future<bool>& fut,
const std::chrono::milliseconds& timeOutMilSec,
const std::string& futDescription);
// When watchdog timeout, this function will be called and return debug info
// for users. For now we only get information from retrieveDesyncReport.
// We are working on enabling more useful debug information for watchdog
@ -720,9 +836,16 @@ class TORCH_API ProcessGroupNCCL : public Backend {
static const int64_t kWatchdogThreadSleepMillis;
// The store is used to broadcast the NCCL unique ID of rank 0.
// The store is used to broadcast the NCCL unique ID of rank 0. This store
// comes with prefix and it is different across ProcessGroup NCCL instances
// (aka, different ProcessGroups).
c10::intrusive_ptr<Store> store_;
// Reference to the store without prefix so that keys are same across all
// ProcessGroup NCCL instances and (key, value) pairs written to the store are
// global.
c10::intrusive_ptr<Store> globalStore_;
bool storeError_{false};
const c10::intrusive_ptr<Options> options_;
@ -781,11 +904,18 @@ class TORCH_API ProcessGroupNCCL : public Backend {
std::mutex mutex_;
// Heartbeat of watchdog thread.
uint64_t heartbeat_;
std::atomic_uint64_t heartbeat_;
// The time interval used for deciding whether there is no watchdog heartbeat.
int heartbeatTimeoutInSec_;
// Extra time of sleep when waiting for timeout dump to finish.
int waitTimeoutDumpInMilSec_;
// Interval of check coordinated signals in ProcessGroupNCCL from other ranks
// e.g., trigger the dump of the debugging info for timeout when notified.
int coordCheckIntervalMilSec_;
// Size of ring buffer where we store NCCL Traces for debugging.
int ncclTraceBufferSize_;
@ -815,6 +945,15 @@ class TORCH_API ProcessGroupNCCL : public Backend {
// Whether there are hooks pending to be fired
std::atomic<bool> hasPendingHooks_;
// This is the signal from watchdog threads to indicate whether the monitor
// thread should dump. Making it static so that it is accessiable from all the
// PGs. With this flag, monitor thread would dump debug info under any one of
// the 3 conditions: 1: this flag is set to true by the watchdog thread when
// it detects a timeout. 2: timeout signal is received from
// other ranks through tcpstore 3: no heartbeat of watchdog Note that only the
// monitor thread from PG0 should dump the debug info and only once
static std::atomic<bool> shouldDump_;
// Mutex to Guard workMetaList_
std::mutex workMetaListMutex_;
@ -823,9 +962,6 @@ class TORCH_API ProcessGroupNCCL : public Backend {
bool writeDebugInfo_ = false;
// Mutex to Guard the check of writeDebugInfo_
std::mutex writeDebugInfoMutex_;
// Condition Variable for watchdog thread sleep
std::condition_variable workMetaListCV_;
@ -902,8 +1038,9 @@ class TORCH_API ProcessGroupNCCL : public Backend {
// Whether or not to enable timeout root cause analysis.
bool desyncDebug_;
// Whether or not to dump debug info on timeout
bool dumpOnTimeout_;
// Whether or not to dump debug info on exception including both watchdog
// timeout and nccl errors.
bool dumpOnException_;
// Whether or not to create start CUDAEvent and enable timing for start
// and end events. Note that enableTiming_ is always true if desyncDebug_
@ -929,14 +1066,25 @@ class TORCH_API ProcessGroupNCCL : public Backend {
std::exception_ptr watchDogException_ = nullptr;
// The callback function to store NCCL debug info.
std::unique_ptr<DebugInfoWriter> debugInfoWriter_ = nullptr;
size_t uid_;
std::string logPrefix_;
};
TORCH_API std::string dump_nccl_trace();
// Gets a mutable reference to a global optional function. Heartbeat Monitor
// will query this function and if available, call it to dump traces. Inside
// fbcode, we store a function here that uses an internal tool for process
// tracing
TORCH_API c10::optional<std::function<std::string()>>& get_cpp_trace_dumper();
// Similar to get_cpp_trace_dumper, this stores a function defined in
// torch-python layer that lets us check whether the GIL can be acquired,
// helpful for instrumenting in cases where a hang was observed.
typedef bool (*gil_checker_t)();
TORCH_API gil_checker_t& get_gil_checker();
} // namespace c10d
#endif // USE_C10D_NCCL

View File

@ -13,7 +13,6 @@
#include <string>
#include <system_error>
#include <vector>
namespace c10d {
/* Trace Utils Related to TORCH_NCCL_DESYNC_DEBUG */
@ -269,10 +268,20 @@ inline std::string retrieveDesyncReport(
#ifdef USE_C10D_NCCL
DebugInfoWriter::DebugInfoWriter(int rank) {
std::string fileName = getCvarString(
{"TORCH_NCCL_DEBUG_INFO_TEMP_FILE"}, "/tmp/nccl_trace_rank_");
filename_ = c10::str(fileName, rank);
/* Helper used by work::getDuration() and nccl flight recorder */
float getDurationFromFirstEvent(
const std::vector<at::cuda::CUDAEvent>& ncclStartEvents,
const std::vector<at::cuda::CUDAEvent>& ncclEndEvents) {
TORCH_CHECK(
ncclStartEvents.size() == 1,
"getDuration only works for single device per ProcessGroup, but found multiple start events.");
TORCH_CHECK(
ncclEndEvents.size() == 1,
"getDuration only works for single device per ProcessGroup, but found multiple end events.");
TORCH_CHECK(
ncclEndEvents[0].query(),
"getDuration can only be called after work is succeeded.")
return ncclStartEvents[0].elapsed_time(ncclEndEvents[0]);
}
DebugInfoWriter::~DebugInfoWriter() = default;
@ -293,6 +302,31 @@ void DebugInfoWriter::write(const std::string& ncclTrace) {
LOG(INFO) << "Finished writing NCCLPG debug info to " << filename_;
}
DebugInfoWriter& DebugInfoWriter::getWriter(int rank) {
if (writer_ == nullptr) {
std::string fileNamePrefix = getCvarString(
{"TORCH_NCCL_DEBUG_INFO_TEMP_FILE"}, "/tmp/nccl_trace_rank_");
// Using std::unique_ptr here to auto-delete the writer object
// when the pointer itself is destroyed.
std::unique_ptr<DebugInfoWriter> writerPtr(
new DebugInfoWriter(fileNamePrefix, rank));
DebugInfoWriter::registerWriter(std::move(writerPtr));
}
return *writer_;
}
void DebugInfoWriter::registerWriter(std::unique_ptr<DebugInfoWriter> writer) {
TORCH_CHECK_WITH(
DistBackendError,
hasWriterRegistered_.load() == false,
"debugInfoWriter already registered");
hasWriterRegistered_.store(true);
writer_ = std::move(writer);
}
std::unique_ptr<DebugInfoWriter> DebugInfoWriter::writer_ = nullptr;
std::atomic<bool> DebugInfoWriter::hasWriterRegistered_(false);
inline std::string pickle_str(const c10::IValue& v) {
std::vector<char> result;
{
@ -317,6 +351,18 @@ inline c10::List<c10::IValue> new_list() {
return c10::List<c10::IValue>(c10::AnyType::get());
}
inline std::string ranks_str(const std::vector<uint64_t>& ranks) {
std::string str;
for (const auto& rank : ranks) {
if (str.empty()) {
str = std::to_string(rank);
} else {
str += ", " + std::to_string(rank);
}
}
return c10::str("[", str, "]");
}
struct NCCLTraceBuffer {
static NCCLTraceBuffer* get() {
// intentionally leak on exit
@ -336,11 +382,12 @@ struct NCCLTraceBuffer {
// buffer this entry will be located to
// update state information
size_t pg_id_;
std::string pg_name_;
size_t seq_id_; // as tracked by the process group
const char* profiling_name_;
std::string profiling_name_;
std::shared_ptr<torch::CapturedTraceback> traceback_;
// we borrow pointser to start_ and end_ so we can query the state
// we borrow pointers to start_ and end_ so we can query the state
// on reporting. However, once the event is completed, the call
// to `complete` will clear these.
EventList *start_, *end_;
@ -348,8 +395,18 @@ struct NCCLTraceBuffer {
// timestamp when the entry was created, likely close to the time the work
// was 'enqueued'- not necessarily started
c10::time_t time_created_;
c10::optional<float> duration_;
const char* state_ = "scheduled";
// timestamp when our CPU threads discovered that the kernel started.
// will always be _after_ it actually started, and can be very late
// if the watchdog thread got stuck on CUDA APIs.
c10::optional<c10::time_t> time_discovered_started_;
// timestamp when our CPU threads discovered that the kernel completed.
// will always be _after_ it actually complated, and can be the same time
// as the discovery of the start if the watchdog thread is stuck on CUDA
// APIs
c10::optional<c10::time_t> time_discovered_completed_;
// size information for input/output tensors
c10::SmallVector<int, 4> input_dims_;
@ -369,8 +426,9 @@ struct NCCLTraceBuffer {
c10::optional<size_t> record(
size_t pg_id,
const std::string& pg_name,
size_t seq_id,
const char* profiling_name,
std::string profiling_name,
const std::vector<at::Tensor>& inputs,
const std::vector<at::Tensor>& outputs,
EventList* start,
@ -385,8 +443,9 @@ struct NCCLTraceBuffer {
auto te = Entry{
id_,
pg_id,
pg_name,
seq_id,
profiling_name,
std::move(profiling_name),
std::move(traceback),
std::move(start),
std::move(end),
@ -424,8 +483,8 @@ struct NCCLTraceBuffer {
break;
}
}
if (started) {
r.state_ = "started";
if (started && !r.time_discovered_started_) {
r.time_discovered_started_ = c10::getTime();
}
}
if (r.end_ != nullptr) {
@ -436,8 +495,8 @@ struct NCCLTraceBuffer {
break;
}
}
if (completed) {
r.state_ = "completed";
if (completed && !r.time_discovered_completed_) {
r.time_discovered_completed_ = c10::getTime();
}
}
}
@ -456,35 +515,97 @@ struct NCCLTraceBuffer {
return result;
}
void retire_id(c10::optional<size_t> id) {
/*
Mark an Event as completed and free its events.
This is called by the watchdog thread, and is asynchronous from the
perspective of the main thread.
compute_duration defaults to true since retire_id is only called in the
watchdog thread, which is currently a place we call cuda APIs which may hang,
but care should be taken to avoid computing duration in any function that must
never hang. (timing must also be enabled for compute_duration - see
TORCH_NCCL_ENABLE_TIMING).
*/
void retire_id(c10::optional<size_t> id, bool compute_duration = true) {
if (!enabled_ || !id) {
return;
}
std::lock_guard<std::mutex> guard(mutex_);
auto& entry = entries_.at(*id % max_entries_);
if (entry.id_ == *id) {
update_state(entry);
entry.retired_ = true;
entry.start_ = entry.end_ = nullptr;
bool can_compute_duration = false;
EventList* startEvents = nullptr;
EventList* endEvents = nullptr;
c10::optional<float> duration = c10::nullopt;
std::unique_lock<std::mutex> guard(mutex_);
Entry* entry = &entries_.at(*id % max_entries_);
if (entry->id_ == *id) {
update_state(*entry);
if (compute_duration) {
can_compute_duration = entry->time_discovered_completed_.has_value() &&
entry->start_ && entry->end_;
startEvents = entry->start_;
endEvents = entry->end_;
}
}
if (can_compute_duration) {
// Compute duration without without holding the lock, because
// cudaEventDuration() can hang, and we need to acquire the lock before we
// can dump(), which we never want to block.
guard.unlock();
duration = getDurationFromFirstEvent(*startEvents, *endEvents);
guard.lock();
// Refresh the entry pointer, see if the entry has been overwritten
entry = &entries_.at(*id % max_entries_);
if (entry->id_ != *id) {
LOG(INFO)
<< "retire_id abandoned for id " << *id
<< ", event was overwritten while waiting to compute duration.";
return;
}
if (duration.has_value()) {
entry->duration_ = duration.value();
}
}
entry->retired_ = true;
entry->start_ = entry->end_ = nullptr;
}
std::string dump() {
std::string dump(
const c10::optional<std::unordered_map<
std::string,
std::unordered_map<std::string, std::string>>>& ncclDumpMap) {
auto result = dump_entries();
auto entries = new_list();
c10::IValue pg_id_s = "pg_id";
c10::IValue seq_id_s = "seq_id";
c10::IValue profiling_name_s = "profiling_name";
c10::IValue input_sizes_s = "input_sizes";
c10::IValue output_sizes_s = "output_sizes";
c10::IValue time_created_s = "time_created_us";
c10::IValue entries_key = "entries";
c10::IValue nccl_comm_key = "nccl_comm_state";
c10::IValue version_key = "version";
// Update whenever changing contents or formatting of the dump
// (minor when adding fields, major when changing existing fields)
c10::IValue version_val = "1.1";
c10::IValue frames_s = "frames";
c10::IValue state_s = "state";
c10::IValue line_s = "line";
c10::IValue name_s = "name";
c10::IValue filename_s = "filename";
c10::IValue retired_s = "retired";
c10::IValue pg_id_key = "pg_id";
c10::IValue pg_name_key = "process_group";
c10::IValue seq_id_key = "seq_id";
c10::IValue profiling_name_key = "profiling_name";
c10::IValue input_sizes_key = "input_sizes";
c10::IValue output_sizes_key = "output_sizes";
c10::IValue time_created_key = "time_created_ns";
c10::IValue duration_key = "duration_ms";
c10::IValue frames_key = "frames";
c10::IValue state_key = "state";
c10::IValue line_key = "line";
c10::IValue name_key = "name";
c10::IValue filename_key = "filename";
c10::IValue retired_key = "retired";
c10::IValue time_discovered_started_key = "time_discovered_started_ns";
c10::IValue time_discovered_completed_key = "time_discovered_completed_ns";
std::vector<torch::CapturedTraceback*> tracebacks;
for (auto& e : result) {
@ -494,9 +615,9 @@ struct NCCLTraceBuffer {
std::vector<c10::IValue> all_frames;
for (const auto& f : stracebacks.all_frames) {
auto d = new_dict();
d.insert(name_s, f.funcname);
d.insert(filename_s, f.filename);
d.insert(line_s, int64_t(f.lineno));
d.insert(name_key, f.funcname);
d.insert(filename_key, f.filename);
d.insert(line_key, int64_t(f.lineno));
all_frames.emplace_back(std::move(d));
}
@ -504,10 +625,14 @@ struct NCCLTraceBuffer {
auto& e = result.at(i);
auto& tb = stracebacks.tracebacks.at(i);
auto dict = new_dict();
dict.insert(pg_id_s, int64_t(e.pg_id_));
dict.insert(seq_id_s, int64_t(e.seq_id_));
dict.insert(profiling_name_s, e.profiling_name_);
dict.insert(time_created_s, int64_t(e.time_created_ / 1000));
dict.insert(pg_id_key, int64_t(e.pg_id_));
dict.insert(pg_name_key, e.pg_name_);
dict.insert(seq_id_key, int64_t(e.seq_id_));
dict.insert(profiling_name_key, e.profiling_name_);
dict.insert(time_created_key, int64_t(e.time_created_));
if (e.duration_) {
dict.insert(duration_key, *e.duration_);
}
auto it = e.sizes_.begin();
auto read_sizes = [&](const c10::SmallVector<int, 4>& dims) {
@ -523,19 +648,55 @@ struct NCCLTraceBuffer {
return sizes;
};
dict.insert(input_sizes_s, read_sizes(e.input_dims_));
dict.insert(output_sizes_s, read_sizes(e.output_dims_));
dict.insert(state_s, e.state_);
dict.insert(retired_s, e.retired_);
dict.insert(input_sizes_key, read_sizes(e.input_dims_));
dict.insert(output_sizes_key, read_sizes(e.output_dims_));
if (e.time_discovered_completed_.has_value()) {
dict.insert(state_key, "completed");
} else if (e.time_discovered_started_.has_value()) {
dict.insert(state_key, "started");
} else {
dict.insert(state_key, "scheduled");
}
dict.insert(
time_discovered_started_key,
e.time_discovered_started_.has_value()
? int64_t(*e.time_discovered_started_)
: c10::IValue());
dict.insert(
time_discovered_completed_key,
e.time_discovered_completed_.has_value()
? int64_t(*e.time_discovered_completed_)
: c10::IValue());
dict.insert(retired_key, e.retired_);
auto frames = new_list();
for (int64_t frame : tb) {
frames.push_back(all_frames.at(frame));
}
dict.insert(frames_s, frames);
dict.insert(frames_key, frames);
entries.push_back(dict);
}
return pickle_str(entries);
// convert ncclDumpMap into a dictionary
auto per_comm_dict = new_dict();
if (ncclDumpMap.has_value()) {
for (const auto& [ncclId, ncclDump] : ncclDumpMap.value()) {
auto inner_dict = new_dict();
for (const auto& [key, value] : ncclDump) {
inner_dict.insert(key, value);
}
per_comm_dict.insert(ncclId, inner_dict);
}
}
auto dict = new_dict();
dict.insert(entries_key, entries);
dict.insert(version_key, version_val);
if (per_comm_dict.size() > 0) {
dict.insert(nccl_comm_key, per_comm_dict);
}
return pickle_str(dict);
}
};

View File

@ -50,6 +50,35 @@
namespace {
#ifdef USE_C10D_NCCL
bool acquire_gil() {
// basically if this function can acquire the gil, it will return quickly.
// if not, it will hang forever. The idea is to call this from a thread
// wrapped in a future, and then check the future after a timeout, to
// determine whether we're facing gil contention.
if (Py_IsInitialized()) {
pybind11::gil_scoped_acquire gil;
return true;
}
// If we end up here, its probably still a "pass" from the perspective of
// checking whether python is stuck. but currently we don't check the return
// value of this function anyway, just check whether it returned quickly vs
// timing out. Taking a long time is the main sign of trouble. Fast return
// with true or with false is both OK from the perspective of debugging python
// hangs.
return false;
}
bool registerGilChecker() {
c10d::get_gil_checker() = &acquire_gil;
return true;
}
static bool registered = registerGilChecker();
#endif // USE_C10D_NCCL
// Wrapper to ensure GIL is released before destructing ProcessGroupGloo
// TODO: move this somewhere more generally useful
template <typename T>
@ -1033,6 +1062,29 @@ Example::
>>> store.add("first_key", 6)
>>> # Should return 7
>>> store.get("first_key")
)")
.def(
"check",
&::c10d::Store::check,
py::call_guard<py::gil_scoped_release>(),
R"(
The call to check whether a given list of ``keys`` have value stored in
the store. This call immediately returns in normal cases but still suffers
from some edge deadlock cases, e.g, calling check after TCPStore has been destroyed.
Calling :meth:`~torch.distributed.store.check` with a list of keys that
one wants to check whether stored in the store or not.
Arguments:
keys (lisr[str]): The keys to query whether stored in the store.
Example::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.add("first_key", 1)
>>> # Should return 7
>>> store.check(["first_key"])
)")
.def(
"delete_key",
@ -1404,7 +1456,11 @@ Arguments:
.def_property_readonly(
"underlying_store",
&::c10d::PrefixStore::getUnderlyingStore,
R"(Gets the underlying store object that PrefixStore wraps around.)");
R"(Gets the underlying store object that PrefixStore wraps around.)")
.def_property_readonly(
"_underlying_non_prefix_store",
&::c10d::PrefixStore::getUnderlyingNonPrefixStore,
R"(Recursively to get the store before layers of wrapping with PrefixStore.)");
auto processGroup =
py::class_<
@ -1807,6 +1863,15 @@ Arguments:
"group_name",
&::c10d::ProcessGroup::getGroupName,
"(Gets this process group name. It's cluster unique)")
.def(
"_set_group_desc",
&::c10d::ProcessGroup::setGroupDesc,
py::call_guard<py::gil_scoped_acquire>(),
"Sets the process group description. This is an internal C10D method, do not use.")
.def_property_readonly(
"group_desc",
&::c10d::ProcessGroup::getGroupDesc,
"Gets this process group description")
.def_property(
"bound_device_id",
&::c10d::ProcessGroup::getBoundDeviceId,
@ -2387,7 +2452,9 @@ Example::
.def_readwrite(
"split_from", &::c10d::ProcessGroupNCCL::Options::split_from)
.def_readwrite(
"split_color", &::c10d::ProcessGroupNCCL::Options::split_color);
"split_color", &::c10d::ProcessGroupNCCL::Options::split_color)
.def_readwrite(
"group_name", &::c10d::ProcessGroupNCCL::Options::group_name);
#endif

View File

@ -0,0 +1,448 @@
import io
import math
from typing import Any, Callable, Dict, Optional, Tuple, TYPE_CHECKING
import torch
import torch.distributed as dist
import torch.nn.functional as F
from torch.distributed._functional_collectives import AsyncCollectiveTensor
if dist.is_available() or TYPE_CHECKING:
from torch.distributed import distributed_c10d
from torch.distributed._shard.sharded_tensor import ShardedTensor
from torch.distributed._tensor import DTensor, Replicate
def _identity_func(
obj: torch.Tensor,
pg: Optional[dist.ProcessGroup],
device: Optional[torch.device],
companion_obj: Any,
) -> torch.Tensor:
return obj
def _all_gather_sharded_tensor(
sharded_tensor: "ShardedTensor",
pg: Optional[dist.ProcessGroup] = None,
device: Optional[torch.device] = None,
) -> torch.Tensor:
if pg is None:
pg = distributed_c10d._get_default_group()
world_size = dist.get_world_size(pg)
shards = sharded_tensor.local_shards()
dim_0_size = sharded_tensor.size()[0] # type: ignore[index]
tensor_numel = sharded_tensor.size().numel() # type: ignore[union-attr]
chunk_size = math.ceil(dim_0_size / world_size) * tensor_numel // dim_0_size
pg_device = (
distributed_c10d._get_pg_default_device(pg) if device is None else device
)
if shards:
local_tensor = shards[0].tensor.flatten()
if local_tensor.device.type != pg_device.type:
local_tensor = local_tensor.to(pg_device)
num_padding = chunk_size - local_tensor.numel()
if num_padding > 0:
local_tensor = F.pad(local_tensor, [0, num_padding])
else:
local_tensor = torch.zeros(
chunk_size, dtype=sharded_tensor.dtype, device=pg_device
)
tensor = torch.empty(
chunk_size * world_size,
dtype=local_tensor.dtype,
device=pg_device,
)
dist.all_gather_into_tensor(tensor, local_tensor, group=pg)
tensor = tensor.narrow(0, 0, tensor_numel).reshape(sharded_tensor.size())
return tensor
class CompanionMismatch(Exception):
...
def _iterate_state_dict(
iter_object: Any,
sharded_tensor_func: Callable,
dtensor_func: Callable,
tensor_func: Callable,
*,
pg: Optional[dist.ProcessGroup] = None,
device: Optional[torch.device] = None,
cpu_offload: bool = False,
companion_obj: Any = None,
ranks_only: Tuple[int, ...] = tuple(),
type_check: bool = True,
non_blocking: bool = True,
) -> Dict[str, Any]:
"""Iterate through the state dict, applying the given functions to each tensor type.
Args:
iter_object (Any): the target state_dict.
sharded_tensor_func (Callable): the function to apply to ShardedTensor
dtensor_func (Callable): the function to apply to DTensor
tensor_func (Callable): the function to apply to Tensor
pg (Optional[dist.ProcessGroup]): process group passed to tensor functions
device (Optional[torch.device]): device passed to tensor functions
cpu_offload (bool): whether to offload the tensors to CPU memory. This option is ignored
if a companion_obj is supplied.
companion_obj (Any): A companion object to the state dict. If this object
is supplied, we attempt to copy the tensor to the companion object.
ranks_only (Tuple[int, ...]): if this tuple is empty, all ranks will
have the same state_dicts. Otherwise only ranks that in ``ranks_only``
have the same state_dicts. Other ranks will get empty state_dicts.
type_check (bool): check if the instance data type is a supported type
that can be saved by DCP. The current supported data types are
torch.Tensor, DTensor, int, float, str, list, dict, None.
non_blocking (bool): whether to use non-blocking copy when copying to the companion object.
"""
# TODO: should we use pytree?
cpu_device = torch.device("cpu")
if isinstance(iter_object, ShardedTensor):
ret = sharded_tensor_func(iter_object, pg, device, companion_obj)
elif isinstance(iter_object, DTensor):
ret = dtensor_func(iter_object, pg, device, companion_obj)
elif isinstance(iter_object, torch.Tensor):
ret = tensor_func(iter_object, pg, device, companion_obj)
elif (
isinstance(iter_object, (int, float, str, bytes, io.BytesIO))
or iter_object is None
):
ret = iter_object
elif isinstance(iter_object, dict):
if companion_obj is not None and (
not isinstance(companion_obj, dict)
or set(companion_obj.keys()) != set(iter_object.keys())
):
raise CompanionMismatch()
ret = {
key: _iterate_state_dict(
value,
sharded_tensor_func,
dtensor_func,
tensor_func,
pg=pg,
device=device,
cpu_offload=cpu_offload,
companion_obj=companion_obj[key] if companion_obj is not None else None,
ranks_only=ranks_only,
type_check=type_check,
non_blocking=non_blocking,
)
for key, value in iter_object.items()
}
elif isinstance(iter_object, (list, tuple)):
if companion_obj is not None and (
not isinstance(companion_obj, (list, tuple))
or len(companion_obj) != len(iter_object)
):
raise CompanionMismatch()
ret = [
_iterate_state_dict(
v,
sharded_tensor_func,
dtensor_func,
tensor_func,
pg=pg,
device=device,
cpu_offload=cpu_offload,
companion_obj=companion_obj[idx] if companion_obj is not None else None,
ranks_only=ranks_only,
type_check=type_check,
non_blocking=non_blocking,
)
for idx, v in enumerate(iter_object)
]
if isinstance(iter_object, tuple):
ret = tuple(ret)
elif not type_check:
ret = iter_object
else:
raise ValueError(f"Unexpected value type {type(iter_object)}")
if not ranks_only or dist.get_rank(pg) in ranks_only:
if isinstance(ret, torch.Tensor):
if cpu_offload and companion_obj is None:
ret = ret.to(cpu_device)
if companion_obj is not None:
# TODO: support DTensor
companion_obj.copy_(ret, non_blocking=non_blocking)
ret = companion_obj
else:
ret = {} if isinstance(ret, dict) else None
return ret
def _gather_state_dict(
state_dict: Dict[str, Any],
*,
pg: Optional[dist.ProcessGroup] = None,
device: Optional[torch.device] = None,
cpu_offload: bool = False,
ranks_only: Tuple[int, ...] = tuple(),
type_check: bool = True,
) -> Dict[str, Any]:
"""
Given a state_dict, this API gathers all the ShardedTensors or DTensors in
the state_dict.
Args:
state_dict (Dict[str, Any]): the target sharded state_dict.
pg (Optional[dist.ProcessGroup]): the process group that is used to
gather ShardedTensor. Note that gathering a DTensor will use
the DeviceMesh. So this argument will be ignored when gathering a
DTensor.
device: (Optional[torch.device]): the device that is used to
perform allgather for ShardedTensor. Note that gathering a DTensor
will use the DeviceMesh. So this argument will be ignored when
gathering a DTensor.
cpu_offload (bool): whether to offload the tensors to CPU memory. The
default value is False.
ranks_only: (Tuple[int, ...]): if this tuple is empty, all ranks will
have the same state_dicts. Otherwise only ranks that in ``ranks_only``
have the same state_dicts. Other ranks will get empty state_dicts.
type_check: (bool): check if the instance data type is a supported type
that can be saved by DCP. The current supported data types are
torch.Tensor, DTensor, int, float, str, list, dict, None.
Returns:
The gathered state dictionary.
"""
def sharded_tensor_func(value, pg, device, companion_obj):
# ShardedTensor does not seem to record the original device type.
# So if the tensor is moved to CPU, we won't know the original type.
# As a result, we have to rely on the user to tell us the correct one.
cpu_device = torch.device("cpu")
output_tensor = _all_gather_sharded_tensor(value, pg, device)
local_shard_device = (
value.local_shards()[0].tensor.device
if value.local_shards()
else cpu_device
)
if output_tensor.device != local_shard_device:
value = output_tensor.to(local_shard_device)
else:
value = output_tensor
return value
def dtensor_func(value, pg, device, companion_obj):
if value.device != value.device_mesh.device_type:
value = value.to(value.device_mesh.device_type)
# FSDP all_gather: [Shard(0)] -> [Replicate()]
# HSDP all_gather: [Replicate(), Shard(0)] -> [Replicate(), Replicate()]
# 2D FSDP + TP all_gather:
# - [Shard(0), Shard(n)] -> [Replicate(), Replicate()]
# - [Shard(0), Replicate()] -> [Replicate(), Replicate()]
placements = [Replicate() for _ in value.placements]
value = value.redistribute(
device_mesh=value.device_mesh,
placements=placements,
)
# Call `wait()` to force the tensor to be synchronous with respect
# to the main stream.
# See the discussion in https://github.com/pytorch/pytorch/pull/117799.
value = value.to_local()
if isinstance(value, AsyncCollectiveTensor):
value = value.wait()
return value
return _iterate_state_dict(
state_dict,
sharded_tensor_func,
dtensor_func,
_identity_func,
pg=pg,
device=device,
cpu_offload=cpu_offload,
ranks_only=ranks_only,
type_check=type_check,
)
def _offload_state_dict_to_cpu(
state_dict: Dict[str, Any],
*,
ranks_only: Tuple[int, ...] = tuple(),
type_check: bool = True,
) -> Dict[str, Any]:
"""
Given a state_dict, this API offload all the tensors to CPU memory.
Args:
state_dict (Dict[str, Any]): the target state_dict.
pg (Optional[dist.ProcessGroup]): the process group that is used to
gather ShardedTensor. Note that gathering a DTensor will use
the DeviceMesh. So this argument will be ignored when gathering a
DTensor.
ranks_only: (Tuple[int, ...]): if this tuple is empty, all ranks will
have the same state_dicts. Otherwise only ranks that in ``ranks_only``
have the same state_dicts. Other ranks will get empty state_dicts.
type_check: (bool): check if the instance data type is a supported type
that can be saved by DCP. The current supported data types are
torch.Tensor, DTensor, int, float, str, list, dict, None.
Returns:
The gathered state dictionary.
"""
ret = _iterate_state_dict(
state_dict,
_identity_func,
_identity_func,
_identity_func,
pg=None,
device=None,
cpu_offload=True,
ranks_only=ranks_only,
type_check=type_check,
)
return ret
def _copy_state_dict(
state_dict: Dict[str, Any],
copy_state_dict: Dict[str, Any],
non_blocking: bool = False,
):
"""
Copies all tensors in a given state dict into a different state_dict with the
same structure.
.. warning::
It is expected by this function that state_dict and copy_state_dict share
the same structure and data types.
.. warning::
The current supported data types are
torch.Tensor, DTensor, int, float, str, list, dict, None.
Args:
state_dict (Dict[str, Any]): the target state_dict.
copy_state_dict (Dict[str, Any]):
The state dict we are copying into. This state_dict must have exactly
the same structure as the source `state_dict`.
non_blocking: (bool): Whether copy ops should be performed asynchronously
"""
_iterate_state_dict(
state_dict,
_identity_func,
_identity_func,
_identity_func,
pg=None,
device=None,
cpu_offload=False,
ranks_only=tuple(),
companion_obj=copy_state_dict,
type_check=True,
non_blocking=non_blocking,
)
def _create_cpu_state_dict(
state_dict: Dict[str, Any], pin_memory: bool = False, share_memory: bool = False
) -> Dict[str, Any]:
"""
Given a state_dict, create another state_dict with the same structure and elements.
However, all tensors in the returned state_dict are new tensors on CPU. These
tensors can be placed on pin_memory or share_memory based on the provided arguments.
.. warning::
Setting both `pin_memory` and `share_memory` to True significantly increases the
latency of this method because of the nuances which require us to register memory
as pinned directly as opposed to relying on the pin_memory cache allocator. This
option should only be used for long lived tensors which are required to be shared.
This is not the case as long as at least one of `pin_memory` or `share_memory` is
set to False.
"""
def tensor_func(
obj: torch.Tensor,
pg: Optional[dist.ProcessGroup],
device: Optional[torch.device],
_: Any,
) -> torch.Tensor:
if len(obj.size()) == 0:
return torch.tensor(0, dtype=obj.dtype)
if share_memory:
t = torch.empty(*tuple(obj.size()), dtype=obj.dtype).share_memory_()
if pin_memory:
succ = torch.cuda.cudart().cudaHostRegister(
t.data_ptr(),
t.numel() * t.element_size(),
1, # lines up with 'cudaHostRegisterPortable'
)
assert (
succ == 0
), f"Pinning shared memory failed with error-code: {succ}"
return t
elif pin_memory:
return torch.empty(*tuple(obj.size()), dtype=obj.dtype).pin_memory()
else:
return torch.empty(*tuple(obj.size()), dtype=obj.dtype)
ret = _iterate_state_dict(
state_dict,
_identity_func,
_identity_func,
tensor_func,
pg=None,
device=None,
cpu_offload=False,
ranks_only=tuple(),
type_check=False,
)
return ret
def _check_state_dict_similarity(
state_dict: Dict[str, Any],
compared_state_dict: Dict[str, Any],
) -> bool:
"""
Given two state_dicts, check if the structures are the same. And
if a [key, tensor] pair exist in one state_dict there must be
the a corresponding pait, [key, other_tensor], in the other state_dict,
where tensor and other_tensor have the same size and dtype.
Return the check result.
"""
def tensor_func(
obj: torch.Tensor,
pg: Optional[dist.ProcessGroup],
device: Optional[torch.device],
companion_obj: Any,
) -> torch.Tensor:
if companion_obj.dtype != obj.dtype or companion_obj.size() != obj.size():
raise CompanionMismatch()
return obj
try:
_iterate_state_dict(
state_dict,
_identity_func,
_identity_func,
tensor_func,
pg=None,
device=None,
cpu_offload=False,
ranks_only=tuple(),
companion_obj=compared_state_dict,
type_check=False,
)
except CompanionMismatch:
return False
return True

View File

@ -1171,7 +1171,7 @@ def init_process_group(
)
default_pg, _ = _new_process_group_helper(
-1, -1, [], backend, None, group_name, timeout=timeout
-1, -1, [], backend, None, group_name, timeout=timeout, group_desc="default_pg"
)
_update_default_pg(default_pg)
else:
@ -1197,6 +1197,7 @@ def init_process_group(
pg_options=pg_options,
timeout=timeout,
device_id=device_id,
group_desc="default_pg"
)
_update_default_pg(default_pg)
@ -1257,6 +1258,7 @@ def _new_process_group_helper(
timeout=None,
pg_tag=None,
device_id=None,
group_desc=None,
):
"""
Create a new distributed process group.
@ -1289,6 +1291,8 @@ def _new_process_group_helper(
_, prefix_store = _world.pg_map[existing_group]
return existing_group, prefix_store
group_desc = "undefined" if group_desc is None else group_desc
# The list of group ranks is empty if we're creating the default group.
is_default_group = len(global_ranks_in_group) == 0
@ -1375,6 +1379,7 @@ def _new_process_group_helper(
if split_from:
pg_options.split_from = split_from
pg_options.split_color = _process_group_color(global_ranks_in_group)
pg_options.group_name = group_name
backend_class = ProcessGroupNCCL(
backend_prefix_store, group_rank, group_size, pg_options)
backend_type = ProcessGroup.BackendType.NCCL
@ -1461,9 +1466,11 @@ def _new_process_group_helper(
# update global state
assert group_name is not None
assert group_desc is not None
_world.pg_map[pg] = (backend, prefix_store)
_world.pg_names[pg] = group_name
pg._set_group_name(group_name)
pg._set_group_desc(group_desc)
_world.pg_backend_config[pg] = str(backend_config)
# "" is the default tag for user PGs
@ -3614,7 +3621,7 @@ def _get_backend_from_str(backend: Optional[str] = None) -> Backend:
@_time_logger
def new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False):
def new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False, group_desc=None):
"""
Create a new distributed group.
@ -3655,6 +3662,7 @@ def new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local
barrier at the end of the process group creation. This is different
in that non-member ranks don't need to call into API and don't
join the barrier.
group_desc (str, optional): a string to describe the process group.
Returns:
A handle of distributed group that can be given to collective calls or None if the rank is not part of ``ranks``.
@ -3669,7 +3677,15 @@ def new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local
multiple overlaping process groups. To avoid that, make sure all ranks follow the
same global creation order.
"""
return _new_group_with_tag(ranks, timeout, backend, pg_options, None, use_local_synchronization=use_local_synchronization)
return _new_group_with_tag(
ranks,
timeout,
backend,
pg_options,
None,
use_local_synchronization=use_local_synchronization,
group_desc=group_desc,
)
def _new_group_with_tag(
ranks=None,
@ -3677,7 +3693,8 @@ def _new_group_with_tag(
backend=None,
pg_options=None,
pg_tag=None,
use_local_synchronization=False
use_local_synchronization=False,
group_desc=None
):
"""
Variant of ``new_group`` that exposes tag creation.
@ -3749,7 +3766,8 @@ def _new_group_with_tag(
group_name,
pg_options=pg_options,
timeout=timeout,
pg_tag=pg_tag
pg_tag=pg_tag,
group_desc=group_desc
)
# Create the global rank to group rank mapping
@ -3789,6 +3807,7 @@ def new_subgroups(
timeout=None,
backend=None,
pg_options=None,
group_desc=None,
):
"""
Create subgroups of equal size.
@ -3841,6 +3860,8 @@ def new_subgroups(
the construction of specific process groups. i.e. for the ``nccl``
backend, ``is_high_priority_stream`` can be specified so that
process group can pick up high priority cuda streams.
group_desc (str, optional): A string describing the group. Each subgroup will
inherit its group_desc
Returns:
The subgroup containing the current rank, and all the subgroups used for cleanup.
@ -3886,6 +3907,7 @@ def new_subgroups(
timeout=timeout,
backend=backend,
pg_options=pg_options,
group_desc=group_desc,
)
subgroups.append(subgroup)
@ -3905,6 +3927,7 @@ def new_subgroups_by_enumeration(
timeout=None,
backend=None,
pg_options=None,
group_desc=None,
):
"""
Create subgroups by dividing the global world.
@ -3945,6 +3968,8 @@ def new_subgroups_by_enumeration(
the construction of specific process groups. i.e. for the ``nccl``
backend, ``is_high_priority_stream`` can be specified so that
process group can pick up high priority cuda streams.
group_desc (str, optional): A string describing the group. Each subgroup will
inherit its group_desc.
Returns:
The subgroup containing the current rank, and all the subgroups used for cleanup.
@ -3973,6 +3998,7 @@ def new_subgroups_by_enumeration(
timeout=timeout,
backend=backend,
pg_options=pg_options,
group_desc=group_desc,
)
subgroups.append(subgroup)
my_rank = get_rank()

View File

@ -28,7 +28,7 @@ def tail_logfile(
return
time.sleep(interval_sec)
with open(file) as fp:
with open(file, errors="replace") as fp:
while True:
line = fp.readline()