Compare commits

...

2 Commits

Author SHA1 Message Date
bc0ea1be43 change reset 2025-10-16 08:08:22 -07:00
2505ba2717 debug pg 2025-10-16 08:08:22 -07:00
7 changed files with 145 additions and 29 deletions

View File

@ -7,7 +7,23 @@ namespace c10d {
void DebugInfoWriter::write(const std::string& trace) {
std::string filename = filename_;
if (enable_dynamic_filename_) {
LOG(INFO) << "Writing Flight Recorder debug info to a dynamic file name";
filename = c10::str(getCvarString({"TORCH_FR_DUMP_TEMP_FILE"}, ""), rank_);
// Check if filename contains a "/" and create directory if needed
size_t slashPos = filename.find_last_of('/');
if (slashPos != std::string::npos) {
// Extract directory path (everything before the last '/')
std::string dirPath = filename.substr(0, slashPos);
try {
c10::filesystem::create_directories(dirPath);
LOG(INFO) << "Created directory for Flight Recorder output: " << dirPath;
} catch (const std::exception& e) {
LOG(ERROR) << "Failed to create directory " << dirPath << ": " << e.what();
}
}
} else {
LOG(INFO) << "Writing Flight Recorder debug info to a static file name";
}
// Open a file for writing. The ios::binary flag is used to write data as
// binary.

View File

@ -108,12 +108,14 @@ struct FlightRecorder {
capture_cpp_stack_ = getCvarBool(
{"TORCH_FR_CPP_STACK", "TORCH_NCCL_TRACE_CPP_STACK"}, false);
enabled_ = max_entries_ > 0;
reset_epoch_start_idx_[0] = 0;
}
struct Entry {
size_t id_; // incremented id in the trace buffer
// used to figure out where in the circular entries
// buffer this entry will be located to
// update state information
size_t reset_epoch_; // epoch when this entry was created
size_t pg_id_;
std::tuple<std::string, std::string> pg_name_; // <group_name, group_desc>
@ -183,12 +185,19 @@ struct FlightRecorder {
size_t max_entries_ = 0;
size_t next_ = 0;
size_t id_ = 0;
size_t reset_epoch_ = 0;
std::unordered_map<size_t, size_t> reset_epoch_start_idx_; // maps reset_epoch to the idx where it starts
std::map<size_t, std::shared_ptr<ProcessGroupStatus>> all_pg_status_;
std::map<std::tuple<std::string, std::string>, std::vector<uint64_t>>
pg_name_to_ranks_;
std::string comm_lib_version_;
std::optional<size_t> record(
struct TraceIdentifier {
std::optional<size_t> id;
std::optional<size_t> reset_epoch;
};
TraceIdentifier record(
size_t pg_id,
const std::tuple<std::string, std::string>& pg_name,
size_t collective_seq_id,
@ -213,9 +222,15 @@ struct FlightRecorder {
std::vector<Entry> dump_entries();
// Returns the entry with the given id, if it exists. Otherwise, returns
// Returns the index in entries_ for the given id and reset_epoch.
// Caller must hold mutex_lock before calling this method.
size_t getIdxFromId(size_t id, size_t reset_epoch) const;
// Returns the entry with the given id and reset_epoch, if it exists. Otherwise, returns
// std::nullopt.
TORCH_API std::optional<Entry> getEntry(std::optional<size_t> id);
TORCH_API std::optional<Entry> getEntry(
std::optional<size_t> id,
std::optional<size_t> reset_epoch);
/*
Mark an Event as completed and free its events.
@ -229,6 +244,7 @@ struct FlightRecorder {
*/
TORCH_API void retire_id(
std::optional<size_t> id,
std::optional<size_t> reset_epoch,
bool compute_duration = true);
TORCH_API void reset_all();

View File

@ -39,7 +39,7 @@ std::string FlightRecorder<EventType>::Entry::getTraceback() {
}
template <typename EventType>
std::optional<size_t> FlightRecorder<EventType>::record(
typename FlightRecorder<EventType>::TraceIdentifier FlightRecorder<EventType>::record(
size_t pg_id,
const std::tuple<std::string, std::string>& pg_name,
size_t collective_seq_id,
@ -54,7 +54,7 @@ std::optional<size_t> FlightRecorder<EventType>::record(
std::shared_ptr<ProcessGroupStatus> pg_status,
bool isP2P) {
if (!enabled_) {
return std::nullopt;
return TraceIdentifier{std::nullopt, std::nullopt};
}
if (all_pg_status_.find(pg_id) == all_pg_status_.end()) {
// Current pg_status is not in FR.
@ -64,8 +64,11 @@ std::optional<size_t> FlightRecorder<EventType>::record(
torch::CapturedTraceback::gather(true, true, capture_cpp_stack_);
std::lock_guard<std::mutex> guard(mutex_);
TORCH_CHECK(reset_epoch_start_idx_.find(reset_epoch_) != reset_epoch_start_idx_.end());
auto te = Entry{
id_,
reset_epoch_,
pg_id,
pg_name,
collective_seq_id,
@ -112,7 +115,8 @@ std::optional<size_t> FlightRecorder<EventType>::record(
next_ = 0;
}
}
return id_++;
const auto id = id_++;
return TraceIdentifier{id, reset_epoch_};
}
template <typename EventType>
@ -161,8 +165,10 @@ template <typename EventType>
std::vector<typename FlightRecorder<EventType>::Entry> FlightRecorder<
EventType>::dump_entries() {
std::vector<Entry> result;
size_t current_epoch;
{
std::lock_guard<std::mutex> guard(mutex_);
current_epoch = reset_epoch_;
result.reserve(entries_.size());
result.insert(
result.end(),
@ -178,32 +184,57 @@ std::vector<typename FlightRecorder<EventType>::Entry> FlightRecorder<
update_state(r);
r.start_ = r.end_ = nullptr;
}
// Filter out entries from previous epochs
// Only keep entries where reset_epoch_ == current_epoch
result.erase(
std::remove_if(
result.begin(),
result.end(),
[current_epoch](const Entry& e) {
return e.reset_epoch_ < current_epoch;
}),
result.end());
return result;
}
template <typename EventType>
// Returns the entry with the given id, if it exists. Otherwise, returns
// Returns the index in entries_ for the given id and reset_epoch.
// Caller must hold mutex_lock before calling this method.
size_t FlightRecorder<EventType>::getIdxFromId(size_t id, size_t reset_epoch) const {
// Look up the starting idx for the given reset epoch
auto it = reset_epoch_start_idx_.find(reset_epoch);
TORCH_CHECK(it != reset_epoch_start_idx_.end());
// Calculate idx based on where the epoch started
return (it->second + id) % max_entries_;
}
template <typename EventType>
// Returns the entry with the given id and reset_epoch, if it exists. Otherwise, returns
// std::nullopt.
std::optional<typename FlightRecorder<EventType>::Entry> FlightRecorder<
EventType>::getEntry(std::optional<size_t> id) {
if (!enabled_ || !id) {
EventType>::getEntry(
std::optional<size_t> id,
std::optional<size_t> reset_epoch) {
if (!enabled_ || !id || !reset_epoch) {
return std::nullopt;
}
std::unique_lock<std::mutex> guard(mutex_);
Entry entry = entries_.at(*id % max_entries_);
if (entry.id_ == *id) {
Entry entry = entries_.at(getIdxFromId(*id, *reset_epoch));
if (entry.id_ == *id && entry.reset_epoch_ == *reset_epoch) {
return entry;
} else {
return std::nullopt;
}
return std::nullopt;
}
template <typename EventType>
void FlightRecorder<EventType>::retire_id(
std::optional<size_t> id,
std::optional<size_t> reset_epoch,
bool compute_duration) {
if (!enabled_ || !id) {
if (!enabled_ || !id || !reset_epoch) {
return;
}
@ -214,8 +245,12 @@ void FlightRecorder<EventType>::retire_id(
std::unique_lock<std::mutex> guard(mutex_);
Entry* entry = &entries_.at(*id % max_entries_);
if (entry->id_ == *id) {
auto idx = getIdxFromId(*id, *reset_epoch);
Entry* entry = &entries_.at(idx);
bool entry_matches = (entry->id_ == *id && entry->reset_epoch_ == *reset_epoch);
if (entry_matches) {
update_state(*entry);
if (compute_duration) {
@ -237,8 +272,10 @@ void FlightRecorder<EventType>::retire_id(
guard.lock();
// Refresh the entry pointer, see if the entry has been overwritten
entry = &entries_.at(*id % max_entries_);
if (entry->id_ != *id) {
entry = &entries_.at(getIdxFromId(*id, *reset_epoch));
bool still_matches = (entry->id_ == *id && entry->reset_epoch_ == *reset_epoch);
if (!still_matches) {
LOG(INFO) << "retire_id abandoned for id " << *id
<< ", event was overwritten while waiting to compute duration.";
return;
@ -252,9 +289,13 @@ void FlightRecorder<EventType>::retire_id(
template <typename EventType>
void FlightRecorder<EventType>::reset_all() {
std::lock_guard<std::mutex> guard(mutex_);
next_ = 0;
id_ = 0;
entries_.clear();
if (!entries_.empty()) {
// Soft delete: increment epoch to mark all existing entries as old
// Store where the new epoch starts in the circular buffer
reset_epoch_++;
reset_epoch_start_idx_[reset_epoch_] = next_;
id_ = 0;
}
}
template <typename EventType>

View File

@ -708,7 +708,7 @@ void ProcessGroupGloo::runLoop(int workerIndex) {
// TODO: We need to have numel of tensors for gloo as well.
pgStatus_->lastCompletedNumelIn = 0;
pgStatus_->lastCompletedNumelOut = 0;
FlightRecorder<c10::Event>::get()->retire_id(work->trace_id_, false);
FlightRecorder<c10::Event>::get()->retire_id(work->trace_id_, work->trace_reset_epoch_, false);
lock.lock();
workInProgress_[workerIndex].reset();
}
@ -780,7 +780,7 @@ void ProcessGroupGloo::enqueue(c10::intrusive_ptr<AsyncWork> work) {
pgStatus_->lastEnqueuedNumelOut = 0;
// using c10d::FlightRecorder;
// TODO: We need to have a way to use c10::Event inside gloo as well.
work->trace_id_ = FlightRecorder<c10::Event>::get()->record(
auto traceId = FlightRecorder<c10::Event>::get()->record(
local_id_,
std::make_tuple(pg_uid_, pg_desc_),
collectiveCounter_,
@ -795,6 +795,8 @@ void ProcessGroupGloo::enqueue(c10::intrusive_ptr<AsyncWork> work) {
work->getTimeout(),
pgStatus_,
false);
work->trace_id_ = traceId.id;
work->trace_reset_epoch_ = traceId.reset_epoch;
workQueue_.push_back(std::move(work));
lock.unlock();

View File

@ -99,6 +99,7 @@ class TORCH_API ProcessGroupGloo : public Backend {
// unique id used to tell the trace buffer that this
// work has completed
std::optional<uint64_t> trace_id_;
std::optional<uint64_t> trace_reset_epoch_;
std::shared_ptr<gloo::Context> context_;
const std::chrono::milliseconds timeout_;

View File

@ -14,6 +14,7 @@
#include <c10/cuda/CUDAAllocatorConfig.h>
#include <c10/cuda/CUDAGraphsC10Utils.h>
#include <c10/cuda/CUDAGuard.h>
#include <c10/util/Backtrace.h>
#include <c10/util/Exception.h>
#include <c10/util/Logging.h>
#include <c10/util/WaitCounter.h>
@ -31,6 +32,7 @@
#include <torch/csrc/distributed/c10d/TraceUtils.h>
#include <torch/csrc/distributed/c10d/Utils.hpp>
#include <torch/csrc/distributed/c10d/cuda/utils.hpp>
#include <torch/csrc/profiler/combined_traceback.h>
#include <torch/torch.h>
#include <optional>
@ -68,6 +70,16 @@ inline bool isUnsupportedFloat8(at::ScalarType t) {
);
}
void print_traceback(const torch::SymbolizedTracebacks& st,
size_t traceback_index) {
const std::vector<uint64_t>& traceback = st.tracebacks[traceback_index];
for (uint64_t idx : traceback) {
const torch::unwind::Frame& frame = st.all_frames[idx];
LOG(ERROR) << " File \"" << frame.filename << "\", line "
<< frame.lineno << ", in " << frame.funcname << "\n";
}
}
#ifdef ENABLE_NCCL_PREMUL_SUM_SUPPORT
template <typename T, ncclDataType_t dataType>
ncclRedOpRAII unpackPreMulSum(
@ -704,9 +716,9 @@ bool ProcessGroupNCCL::WorkNCCL::checkTimeout(
// Print the traceback of the collective at call time
std::string ProcessGroupNCCL::WorkNCCL::getTraceback() const {
// First step we get the corresponding record entry from FR, based on work's
// trace_id_
// trace_id_ and trace_reset_epoch_
std::optional<FlightRecorderCUDA::Entry> entry =
FlightRecorderCUDA::get()->getEntry(trace_id_);
FlightRecorderCUDA::get()->getEntry(trace_id_, trace_reset_epoch_);
if (entry.has_value()) {
auto entryVal = entry.value();
// Get stack trace from FR entry, in string format
@ -2090,6 +2102,19 @@ void ProcessGroupNCCL::Watchdog::run() {
"Process group watchdog thread terminated with exception: ",
e.what());
LOG(ERROR) << exitMsg;
LOG(ERROR) << "Backtrace:";
LOG(ERROR) << c10::get_lazy_backtrace();
std::shared_ptr<torch::CapturedTraceback> tb0 =
torch::CapturedTraceback::gather(/*python=*/true, /*script=*/true, /*cpp=*/true);
std::shared_ptr<torch::CapturedTraceback> tb1 =
torch::CapturedTraceback::gather(/*python=*/true, /*script=*/true, /*cpp=*/true);
torch::SymbolizedTracebacks r = torch::symbolize({tb0.get(), tb1.get()});
print_traceback(r, 0);
print_traceback(r, 1);
std::abort();
if (C10_LIKELY(rethrowCUDAErrors_) ||
!(std::string(e.what()).find("CUDA Error"))) {
// TODO(whc) clean up the rethrow - why is it stored in a class var and
@ -2394,7 +2419,7 @@ void ProcessGroupNCCL::Watchdog::runLoop() {
pg_->pgStatus_->lastCompletedWorkName = opTypeToString(work.opType_);
pg_->pgStatus_->lastCompletedNumelIn = work.numelIn_;
pg_->pgStatus_->lastCompletedNumelOut = work.numelOut_;
FlightRecorderCUDA::get()->retire_id(work.trace_id_, true);
FlightRecorderCUDA::get()->retire_id(work.trace_id_, work.trace_reset_epoch_, true);
if (pg_->onCompletionHook_) {
// Move Work object to completedWorkList_ to be consumed by the hook
// thread
@ -3360,7 +3385,7 @@ c10::intrusive_ptr<ProcessGroupNCCL::WorkNCCL> ProcessGroupNCCL::initWork(
// these objects to the Work because it has implications for keeping those
// tensors alive longer and adds overhead when copying Work objects
// between threads
r->trace_id_ = FlightRecorderCUDA::get()->record(
auto traceId = FlightRecorderCUDA::get()->record(
local_id_,
std::make_tuple(pg_uid_, pg_desc_),
seqCollective_,
@ -3374,6 +3399,8 @@ c10::intrusive_ptr<ProcessGroupNCCL::WorkNCCL> ProcessGroupNCCL::initWork(
options_->timeout,
pgStatus_,
isP2P);
r->trace_id_ = traceId.id;
r->trace_reset_epoch_ = traceId.reset_epoch;
}
return r;
}
@ -4168,7 +4195,7 @@ c10::intrusive_ptr<Work> ProcessGroupNCCL::pointToPoint(
// TODO(whc) because we don't pass output {tensor} to initWork, we tell
// initWork to not record, and then we manually call record passing all the
// information it wants.
work->trace_id_ = FlightRecorderCUDA::get()->record(
auto traceId = FlightRecorderCUDA::get()->record(
local_id_,
std::make_tuple(pg_uid_, pg_desc_),
seqCollective_,
@ -4182,6 +4209,8 @@ c10::intrusive_ptr<Work> ProcessGroupNCCL::pointToPoint(
options_->timeout,
pgStatus_,
/*isP2P=*/true);
work->trace_id_ = traceId.id;
work->trace_reset_epoch_ = traceId.reset_epoch;
}
// Only check for NaN for send ops, for recv ops `tensor` can be a random

View File

@ -195,10 +195,17 @@ static std::vector<std::string> TORCH_NCCL_USE_TENSOR_REGISTER_ALLOCATOR_HOOK =
#if defined(__linux__)
struct DumpPipe {
DumpPipe(int rank) {
LOG(INFO) << "DumpPipe works! Rank: " << rank;
std::string fileStem =
getCvarString({"TORCH_NCCL_DEBUG_INFO_PIPE_FILE"}, "");
if (fileStem.empty() ||
getCvarInt({"TORCH_NCCL_TRACE_BUFFER_SIZE"}, 0) <= 0) {
if (fileStem.empty()) {
LOG(INFO) << "DumpPipe is not enabled. Empty file";
}
if (getCvarInt({"TORCH_NCCL_TRACE_BUFFER_SIZE"}, 0) <= 0) {
LOG(INFO) << "DumpPipe is not enabled. Trace buffer size is 0";
}
return;
}
TORCH_CHECK(!fileStem.empty(), "TORCH_NCCL_DEBUG_INFO_PIPE_FILE is empty");
@ -243,8 +250,11 @@ struct DumpPipe {
};
#else
struct DumpPipe {
DumpPipe(int rank) {}
DumpPipe(int rank) {
LOG(INFO) << rank << ": DumpPipe is only supported on Linux.";
}
bool shouldDump() {
LOG(INFO) << "Cannot dump. DumpPipe is only supported on Linux";
return false;
}
};
@ -505,6 +515,7 @@ class TORCH_API ProcessGroupNCCL : public Backend {
// unique id used to tell the trace buffer that this
// work has completed
std::optional<uint64_t> trace_id_;
std::optional<uint64_t> trace_reset_epoch_;
DebugLevel distDebugLevel_;
friend class ProcessGroupNCCL;
};