[c10d][flight recorder] dump additinal NCCL debug info (#120063)

Summary:
This PR is mainly about flight recorder side of changes that takes a
map of maps as input, and dump it as picklable. Also add functions that
should be compiled only when NCCL_COMM_DUMP is defined
Test Plan:
Integration tests with NCCL would be done later, here we only do the
c10d side of dump test, aka,NCCLTraceTest

Testing the dump function is a bit tricky as we don't have
existing C++ unit tests for them. So we still use the Python NCCLTraceTest with
the python binding of _dump_nccl_trace(), we manually fed the
dump_nccl_trace with a map of test info, and assert the pickle result and
print the converted python dict:
```
(sqzhang_1) [sqzhang@devgpu009.cln1 ~/pytorch (main)]$  python
test/distributed/test_c10d_nccl.py NCCLTraceTest
NCCL version 2.19.3+cuda12.0
[rank0]:[E ProcessGroupNCCL.cpp:1200] [PG 0 Rank 0] ProcessGroupNCCL
preparing to dump debug info.
.NCCL version 2.19.3+cuda12.0
.NCCL version 2.19.3+cuda12.0
{'ncclID2': {'Key2': 'Value2', 'Key1': 'Value1'}, 'ncclID1': {'Key2':
'Value2', 'Key1': 'Value1'}}
{'ncclID2': {'Key2': 'Value2', 'Key1': 'Value1'}, 'ncclID1': {'Key2':
'Value2', 'Key1': 'Value1'}}
.NCCL version 2.19.3+cuda12.0
{'ncclID2': {'Key2': 'Value2', 'Key1': 'Value1'}, 'ncclID1': {'Key2':
'Value2', 'Key1': 'Value1'}}
{'ncclID2': {'Key2': 'Value2', 'Key1': 'Value1'}, 'ncclID1': {'Key2':
'Value2', 'Key1': 'Value1'}}
.NCCL version 2.19.3+cuda12.0
.NCCL version 2.19.3+cuda12.0
.NCCL version 2.19.3+cuda12.0
.NCCL version 2.19.3+cuda12.0
.
----------------------------------------------------------------------
Ran 8 tests in 95.761s
OK
```

Pull Request resolved: https://github.com/pytorch/pytorch/pull/120063
Approved by: https://github.com/wconstab
This commit is contained in:
Shuqiang Zhang
2024-02-20 14:29:08 -08:00
committed by PyTorch MergeBot
parent 06bc203c7b
commit a24cba35b0
3 changed files with 62 additions and 2 deletions

View File

@ -300,6 +300,20 @@ class NCCLComm {
}
#endif
#ifdef IS_NCCL_EXP
#ifdef NCCL_COMM_DUMP
std::unordered_map<std::string, std::string> ncclCommDump() {
std::unordered_map<std::string, std::string> dump;
if (isAborted()) {
LOG(INFO) << "Communicator was aborted before trying to dump its state.";
return dump;
}
C10D_NCCL_CHECK(::ncclCommDump(ncclComm_, dump), c10::nullopt);
return dump;
}
#endif
#endif
ncclUniqueId getNcclId() {
return ncclId_;
}

View File

@ -309,9 +309,36 @@ void cacheAllocatorDeregisterHook(
}
}
#ifdef IS_NCCL_EXP
#ifdef NCCL_COMM_DUMP
std::string dump_nccl_trace() {
return NCCLTraceBuffer::get()->dump();
std::unordered_map<
std::string /* ncclUniqueID */,
std::unordered_map<std::string, std::string> /* dump from this comm */>
ncclDumpMap;
// dump_nccl_trace is only called from the default PG (uid_=0), but we want to
// dump from all comms so we need to iterate over ncclCommDevIdxMap, which
// is static
std::vector<std::shared_ptr<NCCLComm>> allNCCLComms;
// within the critical section, we don't want to dump while holding the lock
// as dump might hang
ncclCommDevIdxMapMutex.lock();
for (auto& [ncclComm, _] : ncclCommDevIdxMap) {
allNCCLComms.push_back(ncclComm);
}
ncclCommDevIdxMapMutex.unlock();
for (auto& ncclComm : allNCCLComms) {
std::string ncclUniqueIDStr = buildNcclUniqueIdStr(ncclComm->getNcclId());
ncclDumpMap[ncclUniqueIDStr] = ncclComm->ncclCommDump();
}
return NCCLTraceBuffer::get()->dump(ncclDumpMap);
}
#endif
#else
std::string dump_nccl_trace() {
return NCCLTraceBuffer::get()->dump(c10::nullopt);
}
#endif
c10::optional<std::function<std::string()>>& get_cpp_trace_dumper() {
static c10::optional<std::function<std::string()>> dumper(c10::nullopt);

View File

@ -565,10 +565,14 @@ struct NCCLTraceBuffer {
entry->start_ = entry->end_ = nullptr;
}
std::string dump() {
std::string dump(
const c10::optional<std::unordered_map<
std::string,
std::unordered_map<std::string, std::string>>>& ncclDumpMap) {
auto result = dump_entries();
auto entries = new_list();
c10::IValue entries_key = "entries";
c10::IValue nccl_comm_key = "nccl_comm_state";
c10::IValue version_key = "version";
// Update whenever changing contents or formatting of the dump
// (minor when adding fields, major when changing existing fields)
@ -661,9 +665,24 @@ struct NCCLTraceBuffer {
entries.push_back(dict);
}
// convert ncclDumpMap into a dictionary
auto per_comm_dict = new_dict();
if (ncclDumpMap.has_value()) {
for (const auto& [ncclId, ncclDump] : ncclDumpMap.value()) {
auto inner_dict = new_dict();
for (const auto& [key, value] : ncclDump) {
inner_dict.insert(key, value);
}
per_comm_dict.insert(ncclId, inner_dict);
}
}
auto dict = new_dict();
dict.insert(entries_key, entries);
dict.insert(version_key, version_val);
if (per_comm_dict.size() > 0) {
dict.insert(nccl_comm_key, per_comm_dict);
}
return pickle_str(dict);
}