[c10d][fr] Shrink the range of mutex lock to avoid deadlock (#155949)

While looking into a case when FR dump (actual dump not monitoring thread) takes 30 mins, I realized that our global write lock is grabbed too early so the second effort to dump FR without stack trace will fail because of a deadlock because the global write lock is still hold. So we should only grab the lock when we are ready to write so that we are less likely to keep the lock forever. Also I did an audit to the lock within FR as well and found that there is one place we can shrink as well.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/155949
Approved by: https://github.com/Skylion007
This commit is contained in:
fduwjj
2025-06-13 14:14:37 -07:00
committed by PyTorch MergeBot
parent 3159ee2ad3
commit b8aee84fb9
5 changed files with 18 additions and 14 deletions

View File

@ -2406,7 +2406,7 @@ class ProcessGroupGlooFRTest(ProcessGroupGlooTest):
def _verify_trace(self, t, is_json):
ver = t["version"]
self.assertEqual(ver, "2.7")
self.assertEqual(ver, "2.8")
pg_config = t["pg_config"]
self.assertEqual(len(pg_config), 1)
default_pg_info = pg_config["0"]

View File

@ -4257,7 +4257,7 @@ class NCCLTraceTestBase(MultiProcessTestCase):
class NCCLTraceTest(NCCLTraceTestBase):
def _verify_trace(self, t, include_collectives, timing_enabled, is_json):
ver = t["version"]
self.assertEqual(ver, "2.7")
self.assertEqual(ver, "2.8")
nccl_version = t["nccl_version"]
torch_nccl_version = torch.cuda.nccl.version()
self.assertEqual(nccl_version, ".".join(str(v) for v in torch_nccl_version))

View File

@ -20,7 +20,7 @@ namespace c10d {
// (minor when adding fields, major when changing existing fields)
// Also update both JSON and Pickle dumps to make use of the newly defined
// field(s).
DEFINE_CONSTANT(version_val, "2.7")
DEFINE_CONSTANT(version_val, "2.8")
DEFINE_CONSTANT(entries_key, "entries")
DEFINE_CONSTANT(nccl_comm_key, "nccl_comm_state")
DEFINE_CONSTANT(nccl_version_key, "nccl_version")

View File

@ -160,17 +160,19 @@ void FlightRecorder<EventType>::update_state(Entry& r) {
template <typename EventType>
std::vector<typename FlightRecorder<EventType>::Entry> FlightRecorder<
EventType>::dump_entries() {
std::lock_guard<std::mutex> guard(mutex_);
std::vector<Entry> result;
result.reserve(entries_.size());
result.insert(
result.end(),
entries_.begin() + static_cast<std::ptrdiff_t>(next_),
entries_.end());
result.insert(
result.end(),
entries_.begin(),
entries_.begin() + static_cast<std::ptrdiff_t>(next_));
{
std::lock_guard<std::mutex> guard(mutex_);
result.reserve(entries_.size());
result.insert(
result.end(),
entries_.begin() + static_cast<std::ptrdiff_t>(next_),
entries_.end());
result.insert(
result.end(),
entries_.begin(),
entries_.begin() + static_cast<std::ptrdiff_t>(next_));
}
// query any remaining events
for (auto& r : result) {
update_state(r);

View File

@ -1549,7 +1549,6 @@ bool ProcessGroupNCCL::dumpDebuggingInfo(bool includeStackTrace /*=true*/) {
// multiple calls in one runtime. User is responsible for preserving the
// output file from an earlier call before a later call overwrites it.
static std::mutex writeDebugInfoMutex;
std::lock_guard<std::mutex> lock(writeDebugInfoMutex);
LOG(ERROR)
<< logPrefix()
<< "ProcessGroupNCCL preparing to dump debug info. Include stack trace: "
@ -1559,6 +1558,9 @@ bool ProcessGroupNCCL::dumpDebuggingInfo(bool includeStackTrace /*=true*/) {
// their customized writer by inheriting `DebugInfoWriter` via
// `registerDebugInfoWriter`.
auto ncclTrace = dump_nccl_trace(true, includeStackTrace, false);
// dump_nccl_trace will hang so we don't grab the global lock until we get
// the trace.
std::lock_guard<std::mutex> lock(writeDebugInfoMutex);
DebugInfoWriter& writer = DebugInfoWriter::getWriter(globalRank());
LOG(INFO) << logPrefix() << "ProcessGroupNCCL dumping nccl trace to "
<< writer.getWriterTarget();