Files
pytorch/c10/util/Gauge.cpp
Haibo Chen 2e0b114c06 add a new Guage API with an empty backend to PyTorch core (#134883)
Summary:
The current use case is to continuously measure the total allocated and reserved CUDA memory size from CUDACachingAllocator, and export their distribution (min, max, p90 etc) over time as timeseries.

The current callback-based API does not work because the backend decides when the measurement is taken, so data points between two measurements may not be recorded. The distribution (e.g. max) as such will not be accurate.

This new API closely follow the design of the existing WaitCounter API otherwise.

This is not quite a synchronous version of DynamicCounter, as summing multiple data points does not make sense to my use case

Test Plan: CI

Differential Revision: D61837528

Pull Request resolved: https://github.com/pytorch/pytorch/pull/134883
Approved by: https://github.com/c-p-i-o
2024-09-03 17:08:47 +00:00

80 lines
2.0 KiB
C++

#include <c10/util/Gauge.h>
#include <c10/util/Synchronized.h>
#include <memory>
#include <string>
#include <string_view>
#include <unordered_map>
#include <vector>
namespace c10::monitor {
namespace detail {
namespace {
using GaugeBackendFactories =
std::vector<std::shared_ptr<GaugeBackendFactoryIf>>;
Synchronized<GaugeBackendFactories>& gaugeBackendFactories() {
static auto instance = new Synchronized<GaugeBackendFactories>();
return *instance;
}
} // namespace
class GaugeImpl {
public:
static GaugeImpl& getInstance(std::string_view key) {
static auto& implMapSynchronized = *new Synchronized<
std::unordered_map<std::string, std::unique_ptr<GaugeImpl>>>();
return *implMapSynchronized.withLock([&](auto& implMap) {
if (auto implIt = implMap.find(std::string(key));
implIt != implMap.end()) {
return implIt->second.get();
}
auto [implIt, emplaceSuccess] = implMap.emplace(
std::string{key}, std::unique_ptr<GaugeImpl>(new GaugeImpl(key)));
assert(emplaceSuccess);
return implIt->second.get();
});
}
void record(int64_t value) {
for (auto& backend : backends_) {
backend->record(value);
}
}
private:
explicit GaugeImpl(std::string_view key) {
auto factoriesCopy = gaugeBackendFactories().withLock(
[](auto& factories) { return factories; });
for (const auto& factory : factoriesCopy) {
if (auto backend = factory->create(key)) {
backends_.push_back(std::move(backend));
}
}
}
SmallVector<std::unique_ptr<GaugeBackendIf>> backends_;
};
void registerGaugeBackend(std::unique_ptr<GaugeBackendFactoryIf> backend) {
gaugeBackendFactories().withLock(
[&](auto& backends) { backends.push_back(std::move(backend)); });
}
} // namespace detail
GaugeHandle::GaugeHandle(std::string_view key)
: impl_(detail::GaugeImpl::getInstance(key)) {}
void GaugeHandle::record(int64_t value) {
impl_.record(value);
}
} // namespace c10::monitor