add a new Guage API with an empty backend to PyTorch core (#134883)

Summary:
The current use case is to continuously measure the total allocated and reserved CUDA memory size from CUDACachingAllocator, and export their distribution (min, max, p90 etc) over time as timeseries.

The current callback-based API does not work because the backend decides when the measurement is taken, so data points between two measurements may not be recorded. The distribution (e.g. max) as such will not be accurate.

This new API closely follow the design of the existing WaitCounter API otherwise.

This is not quite a synchronous version of DynamicCounter, as summing multiple data points does not make sense to my use case

Test Plan: CI

Differential Revision: D61837528

Pull Request resolved: https://github.com/pytorch/pytorch/pull/134883
Approved by: https://github.com/c-p-i-o
This commit is contained in:
Haibo Chen
2024-09-03 17:08:45 +00:00
committed by PyTorch MergeBot
parent 7804c089c6
commit 2e0b114c06
2 changed files with 127 additions and 0 deletions

79
c10/util/Gauge.cpp Normal file
View File

@ -0,0 +1,79 @@
#include <c10/util/Gauge.h>
#include <c10/util/Synchronized.h>
#include <memory>
#include <string>
#include <string_view>
#include <unordered_map>
#include <vector>
namespace c10::monitor {
namespace detail {
namespace {
using GaugeBackendFactories =
std::vector<std::shared_ptr<GaugeBackendFactoryIf>>;
Synchronized<GaugeBackendFactories>& gaugeBackendFactories() {
static auto instance = new Synchronized<GaugeBackendFactories>();
return *instance;
}
} // namespace
class GaugeImpl {
public:
static GaugeImpl& getInstance(std::string_view key) {
static auto& implMapSynchronized = *new Synchronized<
std::unordered_map<std::string, std::unique_ptr<GaugeImpl>>>();
return *implMapSynchronized.withLock([&](auto& implMap) {
if (auto implIt = implMap.find(std::string(key));
implIt != implMap.end()) {
return implIt->second.get();
}
auto [implIt, emplaceSuccess] = implMap.emplace(
std::string{key}, std::unique_ptr<GaugeImpl>(new GaugeImpl(key)));
assert(emplaceSuccess);
return implIt->second.get();
});
}
void record(int64_t value) {
for (auto& backend : backends_) {
backend->record(value);
}
}
private:
explicit GaugeImpl(std::string_view key) {
auto factoriesCopy = gaugeBackendFactories().withLock(
[](auto& factories) { return factories; });
for (const auto& factory : factoriesCopy) {
if (auto backend = factory->create(key)) {
backends_.push_back(std::move(backend));
}
}
}
SmallVector<std::unique_ptr<GaugeBackendIf>> backends_;
};
void registerGaugeBackend(std::unique_ptr<GaugeBackendFactoryIf> backend) {
gaugeBackendFactories().withLock(
[&](auto& backends) { backends.push_back(std::move(backend)); });
}
} // namespace detail
GaugeHandle::GaugeHandle(std::string_view key)
: impl_(detail::GaugeImpl::getInstance(key)) {}
void GaugeHandle::record(int64_t value) {
impl_.record(value);
}
} // namespace c10::monitor

48
c10/util/Gauge.h Normal file
View File

@ -0,0 +1,48 @@
#pragma once
#include <memory>
#include <string_view>
#include <c10/macros/Macros.h>
#include <c10/util/SmallVector.h>
namespace c10::monitor {
namespace detail {
class GaugeImpl;
class GaugeBackendIf {
public:
virtual ~GaugeBackendIf() = default;
virtual void record(int64_t value) noexcept = 0;
};
class GaugeBackendFactoryIf {
public:
virtual ~GaugeBackendFactoryIf() = default;
// May return nullptr if the gauge will be ignored by the given backend.
virtual std::unique_ptr<GaugeBackendIf> create(
std::string_view key) noexcept = 0;
};
void C10_API registerGaugeBackend(std::unique_ptr<GaugeBackendFactoryIf>);
} // namespace detail
// A handle to a Gauge.
class C10_API GaugeHandle {
public:
explicit GaugeHandle(std::string_view key);
void record(int64_t value);
private:
detail::GaugeImpl& impl_;
};
} // namespace c10::monitor
#define STATIC_GAUGE(_key) \
[]() -> ::c10::monitor::GaugeHandle& { \
static ::c10::monitor::GaugeHandle handle(#_key); \
return handle; \
}()