Compare commits

...

3 Commits

Author SHA1 Message Date
796115bc9f add backtrace 2025-10-21 09:08:05 -07:00
a382ea57e5 fix fr reset api
Summary:
- there are various places that access fr's `entries_` field
- if we empty the entries_ on reset, the accesses can result in an error
- so we only perform a soft delte instead of clearing out the entries copletely
  - only reset id_ on the reset
  - keep track of a reset_epoch which increments everytime reset is called
  - dump_entries only returns entries from the latest epoch
  - api's that access entries also check if the reset epoch matches
2025-10-21 09:08:05 -07:00
5062482911 update fr trace analysis
Summary:
- allow empty entries from ranks
- allow not all ranks to provide dump
2025-10-21 09:08:02 -07:00
7 changed files with 133 additions and 43 deletions

View File

@ -754,6 +754,10 @@ def align_trace_from_beginning(
# Rank 3: [0, 1, 2, 3, 4, 5, None]
# Then we should start from collective 2 not 0 because any collective before,
# we don't have complete records from all ranks so we need to ignore them.
# If we don't have any trace from some ranks, ignore them
# as well.
if len(entries[rank]) == 0:
continue
first_record_id = entries[rank][0]["record_id"]
maximum_starting_record_id = max(maximum_starting_record_id, first_record_id)

View File

@ -108,12 +108,14 @@ struct FlightRecorder {
capture_cpp_stack_ = getCvarBool(
{"TORCH_FR_CPP_STACK", "TORCH_NCCL_TRACE_CPP_STACK"}, false);
enabled_ = max_entries_ > 0;
reset_epoch_start_idx_[0] = 0;
}
struct Entry {
size_t id_; // incremented id in the trace buffer
// used to figure out where in the circular entries
// buffer this entry will be located to
// update state information
size_t reset_epoch_; // epoch when this entry was created
size_t pg_id_;
std::tuple<std::string, std::string> pg_name_; // <group_name, group_desc>
@ -183,12 +185,20 @@ struct FlightRecorder {
size_t max_entries_ = 0;
size_t next_ = 0;
size_t id_ = 0;
size_t reset_epoch_ = 0;
std::unordered_map<size_t, size_t>
reset_epoch_start_idx_; // maps reset_epoch to the idx where it starts
std::map<size_t, std::shared_ptr<ProcessGroupStatus>> all_pg_status_;
std::map<std::tuple<std::string, std::string>, std::vector<uint64_t>>
pg_name_to_ranks_;
std::string comm_lib_version_;
std::optional<size_t> record(
struct TraceIdentifier {
std::optional<size_t> id;
std::optional<size_t> reset_epoch;
};
TraceIdentifier record(
size_t pg_id,
const std::tuple<std::string, std::string>& pg_name,
size_t collective_seq_id,
@ -213,9 +223,15 @@ struct FlightRecorder {
std::vector<Entry> dump_entries();
// Returns the entry with the given id, if it exists. Otherwise, returns
// std::nullopt.
TORCH_API std::optional<Entry> getEntry(std::optional<size_t> id);
// Returns the index in entries_ for the given id and reset_epoch.
// Caller must hold mutex_lock before calling this method.
size_t getIdxFromId(size_t id, size_t reset_epoch) const;
// Returns the entry with the given id and reset_epoch, if it exists.
// Otherwise, returns std::nullopt.
TORCH_API std::optional<Entry> getEntry(
std::optional<size_t> id,
std::optional<size_t> reset_epoch);
/*
Mark an Event as completed and free its events.
@ -229,6 +245,7 @@ struct FlightRecorder {
*/
TORCH_API void retire_id(
std::optional<size_t> id,
std::optional<size_t> reset_epoch,
bool compute_duration = true);
TORCH_API void reset_all();

View File

@ -39,22 +39,23 @@ std::string FlightRecorder<EventType>::Entry::getTraceback() {
}
template <typename EventType>
std::optional<size_t> FlightRecorder<EventType>::record(
size_t pg_id,
const std::tuple<std::string, std::string>& pg_name,
size_t collective_seq_id,
size_t p2p_seq_id,
size_t op_id,
std::string profiling_name,
const std::vector<at::Tensor>& inputs,
const std::vector<at::Tensor>& outputs,
EventType* start,
EventType* end,
std::chrono::milliseconds timeout_ms,
std::shared_ptr<ProcessGroupStatus> pg_status,
bool isP2P) {
typename FlightRecorder<EventType>::TraceIdentifier FlightRecorder<EventType>::
record(
size_t pg_id,
const std::tuple<std::string, std::string>& pg_name,
size_t collective_seq_id,
size_t p2p_seq_id,
size_t op_id,
std::string profiling_name,
const std::vector<at::Tensor>& inputs,
const std::vector<at::Tensor>& outputs,
EventType* start,
EventType* end,
std::chrono::milliseconds timeout_ms,
std::shared_ptr<ProcessGroupStatus> pg_status,
bool isP2P) {
if (!enabled_) {
return std::nullopt;
return TraceIdentifier{std::nullopt, std::nullopt};
}
if (all_pg_status_.find(pg_id) == all_pg_status_.end()) {
// Current pg_status is not in FR.
@ -64,8 +65,13 @@ std::optional<size_t> FlightRecorder<EventType>::record(
torch::CapturedTraceback::gather(true, true, capture_cpp_stack_);
std::lock_guard<std::mutex> guard(mutex_);
TORCH_CHECK(
reset_epoch_start_idx_.find(reset_epoch_) !=
reset_epoch_start_idx_.end());
auto te = Entry{
id_,
reset_epoch_,
pg_id,
pg_name,
collective_seq_id,
@ -112,7 +118,8 @@ std::optional<size_t> FlightRecorder<EventType>::record(
next_ = 0;
}
}
return id_++;
const auto id = id_++;
return TraceIdentifier{id, reset_epoch_};
}
template <typename EventType>
@ -161,8 +168,10 @@ template <typename EventType>
std::vector<typename FlightRecorder<EventType>::Entry> FlightRecorder<
EventType>::dump_entries() {
std::vector<Entry> result;
size_t current_epoch;
{
std::lock_guard<std::mutex> guard(mutex_);
current_epoch = reset_epoch_;
result.reserve(entries_.size());
result.insert(
result.end(),
@ -178,32 +187,57 @@ std::vector<typename FlightRecorder<EventType>::Entry> FlightRecorder<
update_state(r);
r.start_ = r.end_ = nullptr;
}
// Filter out entries from previous epochs
// Only keep entries where reset_epoch_ == current_epoch
result.erase(
std::remove_if(
result.begin(),
result.end(),
[current_epoch](const Entry& e) {
return e.reset_epoch_ < current_epoch;
}),
result.end());
return result;
}
template <typename EventType>
// Returns the entry with the given id, if it exists. Otherwise, returns
// std::nullopt.
// Returns the index in entries_ for the given id and reset_epoch.
// Caller must hold mutex_lock before calling this method.
size_t FlightRecorder<EventType>::getIdxFromId(size_t id, size_t reset_epoch)
const {
// Look up the starting idx for the given reset epoch
auto it = reset_epoch_start_idx_.find(reset_epoch);
TORCH_CHECK(it != reset_epoch_start_idx_.end());
// Calculate idx based on where the epoch started
return (it->second + id) % max_entries_;
}
template <typename EventType>
// Returns the entry with the given id and reset_epoch, if it exists. Otherwise,
// returns std::nullopt.
std::optional<typename FlightRecorder<EventType>::Entry> FlightRecorder<
EventType>::getEntry(std::optional<size_t> id) {
if (!enabled_ || !id) {
EventType>::
getEntry(std::optional<size_t> id, std::optional<size_t> reset_epoch) {
if (!enabled_ || !id || !reset_epoch) {
return std::nullopt;
}
std::unique_lock<std::mutex> guard(mutex_);
Entry entry = entries_.at(*id % max_entries_);
if (entry.id_ == *id) {
Entry entry = entries_.at(getIdxFromId(*id, *reset_epoch));
if (entry.id_ == *id && entry.reset_epoch_ == *reset_epoch) {
return entry;
} else {
return std::nullopt;
}
return std::nullopt;
}
template <typename EventType>
void FlightRecorder<EventType>::retire_id(
std::optional<size_t> id,
std::optional<size_t> reset_epoch,
bool compute_duration) {
if (!enabled_ || !id) {
if (!enabled_ || !id || !reset_epoch) {
return;
}
@ -214,8 +248,8 @@ void FlightRecorder<EventType>::retire_id(
std::unique_lock<std::mutex> guard(mutex_);
Entry* entry = &entries_.at(*id % max_entries_);
if (entry->id_ == *id) {
Entry* entry = &entries_.at(getIdxFromId(*id, *reset_epoch));
if (entry->id_ == *id && entry->reset_epoch_ == *reset_epoch) {
update_state(*entry);
if (compute_duration) {
@ -237,8 +271,8 @@ void FlightRecorder<EventType>::retire_id(
guard.lock();
// Refresh the entry pointer, see if the entry has been overwritten
entry = &entries_.at(*id % max_entries_);
if (entry->id_ != *id) {
entry = &entries_.at(getIdxFromId(*id, *reset_epoch));
if (!(entry->id_ == *id && entry->reset_epoch_ == *reset_epoch)) {
LOG(INFO) << "retire_id abandoned for id " << *id
<< ", event was overwritten while waiting to compute duration.";
return;
@ -252,9 +286,13 @@ void FlightRecorder<EventType>::retire_id(
template <typename EventType>
void FlightRecorder<EventType>::reset_all() {
std::lock_guard<std::mutex> guard(mutex_);
next_ = 0;
id_ = 0;
entries_.clear();
if (!entries_.empty()) {
// Soft delete: increment epoch to mark all existing entries as old
// Store where the new epoch starts in the circular buffer
reset_epoch_++;
reset_epoch_start_idx_[reset_epoch_] = next_;
id_ = 0;
}
}
template <typename EventType>

View File

@ -708,7 +708,8 @@ void ProcessGroupGloo::runLoop(int workerIndex) {
// TODO: We need to have numel of tensors for gloo as well.
pgStatus_->lastCompletedNumelIn = 0;
pgStatus_->lastCompletedNumelOut = 0;
FlightRecorder<c10::Event>::get()->retire_id(work->trace_id_, false);
FlightRecorder<c10::Event>::get()->retire_id(
work->trace_id_, work->trace_reset_epoch_, false);
lock.lock();
workInProgress_[workerIndex].reset();
}
@ -780,7 +781,7 @@ void ProcessGroupGloo::enqueue(c10::intrusive_ptr<AsyncWork> work) {
pgStatus_->lastEnqueuedNumelOut = 0;
// using c10d::FlightRecorder;
// TODO: We need to have a way to use c10::Event inside gloo as well.
work->trace_id_ = FlightRecorder<c10::Event>::get()->record(
auto traceId = FlightRecorder<c10::Event>::get()->record(
local_id_,
std::make_tuple(pg_uid_, pg_desc_),
collectiveCounter_,
@ -795,6 +796,8 @@ void ProcessGroupGloo::enqueue(c10::intrusive_ptr<AsyncWork> work) {
work->getTimeout(),
pgStatus_,
false);
work->trace_id_ = traceId.id;
work->trace_reset_epoch_ = traceId.reset_epoch;
workQueue_.push_back(std::move(work));
lock.unlock();

View File

@ -99,6 +99,7 @@ class TORCH_API ProcessGroupGloo : public Backend {
// unique id used to tell the trace buffer that this
// work has completed
std::optional<uint64_t> trace_id_;
std::optional<uint64_t> trace_reset_epoch_;
std::shared_ptr<gloo::Context> context_;
const std::chrono::milliseconds timeout_;

View File

@ -33,6 +33,10 @@
#include <torch/csrc/distributed/c10d/cuda/utils.hpp>
#include <torch/torch.h>
#include <optional>
#include <exception>
#include <cxxabi.h>
#include <execinfo.h>
#include <dlfcn.h>
namespace c10d {
@ -41,6 +45,23 @@ using FlightRecorderCUDA = FlightRecorder<at::cuda::CUDAEvent>;
namespace {
// Captures stack trace
extern "C" void __cxa_throw(void* thrown_exception,
std::type_info* tinfo,
void (*dest)(void*)) {
// Capture stack trace here
void* buffer[100];
int size = backtrace(buffer, 100);
LOG(ERROR) << "Exception thrown! Stack trace:\n";
backtrace_symbols_fd(buffer, size, 2);
// Call original throw
typedef void (*orig_cxa_throw_type)(void*, std::type_info*, void (*)(void*));
static orig_cxa_throw_type orig_cxa_throw = (orig_cxa_throw_type)dlsym(RTLD_NEXT, "__cxa_throw");
orig_cxa_throw(thrown_exception, tinfo, dest);
}
#if defined(NCCL_MAJOR) && \
((NCCL_MAJOR > 2) || (NCCL_MAJOR == 2) && (NCCL_MINOR >= 10))
#define NCCL_HAS_AVG 1
@ -704,9 +725,9 @@ bool ProcessGroupNCCL::WorkNCCL::checkTimeout(
// Print the traceback of the collective at call time
std::string ProcessGroupNCCL::WorkNCCL::getTraceback() const {
// First step we get the corresponding record entry from FR, based on work's
// trace_id_
// trace_id_ and trace_reset_epoch_
std::optional<FlightRecorderCUDA::Entry> entry =
FlightRecorderCUDA::get()->getEntry(trace_id_);
FlightRecorderCUDA::get()->getEntry(trace_id_, trace_reset_epoch_);
if (entry.has_value()) {
auto entryVal = entry.value();
// Get stack trace from FR entry, in string format
@ -2394,7 +2415,8 @@ void ProcessGroupNCCL::Watchdog::runLoop() {
pg_->pgStatus_->lastCompletedWorkName = opTypeToString(work.opType_);
pg_->pgStatus_->lastCompletedNumelIn = work.numelIn_;
pg_->pgStatus_->lastCompletedNumelOut = work.numelOut_;
FlightRecorderCUDA::get()->retire_id(work.trace_id_, true);
FlightRecorderCUDA::get()->retire_id(
work.trace_id_, work.trace_reset_epoch_, true);
if (pg_->onCompletionHook_) {
// Move Work object to completedWorkList_ to be consumed by the hook
// thread
@ -3360,7 +3382,7 @@ c10::intrusive_ptr<ProcessGroupNCCL::WorkNCCL> ProcessGroupNCCL::initWork(
// these objects to the Work because it has implications for keeping those
// tensors alive longer and adds overhead when copying Work objects
// between threads
r->trace_id_ = FlightRecorderCUDA::get()->record(
auto traceId = FlightRecorderCUDA::get()->record(
local_id_,
std::make_tuple(pg_uid_, pg_desc_),
seqCollective_,
@ -3374,6 +3396,8 @@ c10::intrusive_ptr<ProcessGroupNCCL::WorkNCCL> ProcessGroupNCCL::initWork(
options_->timeout,
pgStatus_,
isP2P);
r->trace_id_ = traceId.id;
r->trace_reset_epoch_ = traceId.reset_epoch;
}
return r;
}
@ -4168,7 +4192,7 @@ c10::intrusive_ptr<Work> ProcessGroupNCCL::pointToPoint(
// TODO(whc) because we don't pass output {tensor} to initWork, we tell
// initWork to not record, and then we manually call record passing all the
// information it wants.
work->trace_id_ = FlightRecorderCUDA::get()->record(
auto traceId = FlightRecorderCUDA::get()->record(
local_id_,
std::make_tuple(pg_uid_, pg_desc_),
seqCollective_,
@ -4182,6 +4206,8 @@ c10::intrusive_ptr<Work> ProcessGroupNCCL::pointToPoint(
options_->timeout,
pgStatus_,
/*isP2P=*/true);
work->trace_id_ = traceId.id;
work->trace_reset_epoch_ = traceId.reset_epoch;
}
// Only check for NaN for send ops, for recv ops `tensor` can be a random

View File

@ -505,6 +505,7 @@ class TORCH_API ProcessGroupNCCL : public Backend {
// unique id used to tell the trace buffer that this
// work has completed
std::optional<uint64_t> trace_id_;
std::optional<uint64_t> trace_reset_epoch_;
DebugLevel distDebugLevel_;
friend class ProcessGroupNCCL;
};