[PyTorch Pinned Allocator] Add support of background thread to process events (#135524)

Summary: Currently we process events in the regular allocation path and we call cudaEventQuery to check on the events and this path can take some locks in libcuda driver. Its not entirely needed to do process events in the allocation path, we could move this to a background thread and keep processing events regularly and put the freed block to the free list.

Differential Revision: D62396585

Pull Request resolved: https://github.com/pytorch/pytorch/pull/135524
Approved by: https://github.com/zyan0
This commit is contained in:
Banit Agrawal
2024-09-17 21:08:10 +00:00
committed by PyTorch MergeBot
parent 48d18fbd4c
commit a575ce0dc6
5 changed files with 155 additions and 37 deletions

View File

@ -1,4 +1,6 @@
#include <c10/core/Allocator.h>
#include <c10/core/thread_pool.h>
#include <c10/util/CallOnce.h>
#include <c10/util/flat_hash_map.h>
#include <c10/util/llvmMathExtras.h>
#include <optional>
@ -109,6 +111,17 @@ template <
typename E,
typename B = HostBlock<S>>
struct CachingHostAllocatorImpl {
CachingHostAllocatorImpl() {
// Launch the background thread and process events in a loop.
if (pinned_use_background_threads()) {
getBackgroundThreadPool()->run([&]() {
while (true) {
process_events();
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
});
}
}
virtual ~CachingHostAllocatorImpl() = default;
public:
@ -118,17 +131,34 @@ struct CachingHostAllocatorImpl {
return {nullptr, nullptr};
}
process_events();
// First, try to allocate from the free list
auto* block = get_free_block(size);
if (block) {
return {block->ptr_, reinterpret_cast<void*>(block)};
// If we are using background threads, we can process events in the
// background.
if (!pinned_use_background_threads()) {
process_events();
}
// Round up the allocation to the nearest power of two to improve reuse.
// These power of two sizes are also used to index into the free list.
size_t roundSize = c10::llvm::PowerOf2Ceil(size);
// First, try to allocate from the free list
auto* block = get_free_block(roundSize);
if (block) {
return {block->ptr_, reinterpret_cast<void*>(block)};
}
// Check in the recently freed blocks with pending events to see if we
// can reuse them. Call get_free_block again after processing events
if (pinned_use_background_threads()) {
process_events_for_specific_size(roundSize);
block = get_free_block(roundSize);
if (block) {
return {block->ptr_, reinterpret_cast<void*>(block)};
}
}
// Slow path: if we can't allocate from the cached free list, we need
// to create a new block.
void* ptr = nullptr;
allocate_host_memory(roundSize, &ptr);
@ -237,6 +267,10 @@ struct CachingHostAllocatorImpl {
return c10::llvm::Log2_64_Ceil(size);
}
virtual bool pinned_use_background_threads() {
return false;
}
virtual void copy_data(void* dest [[maybe_unused]], const void* src [[maybe_unused]], std::size_t count [[maybe_unused]]) const {
TORCH_CHECK_NOT_IMPLEMENTED(false, "Not implemented for copy_data");
}
@ -261,6 +295,21 @@ struct CachingHostAllocatorImpl {
}
virtual void process_events() {
// process all events until the last unready event, not for specific size.
process_events_for_specific_size(-1);
}
// If size is -1, process all events from backwards until the last unready
// event. Otherwise, process events for a specific size and on first ready block
// is found, add it to the free list and return.
virtual void process_events_for_specific_size(int64_t size) {
size_t event_count = 0;
size_t max_events = 0;
{
std::lock_guard<std::mutex> g(events_mutex_);
max_events = events_.size();
}
while (true) {
// Avoid calling cudaEventDestroy while holding a mutex, so move
// intermediate events out of the lock into this object.
@ -278,6 +327,25 @@ struct CachingHostAllocatorImpl {
return;
}
if (size != -1) {
if (event_count++ > max_events) {
{
std::lock_guard<std::mutex> g(events_mutex_);
events_.push_front(std::move(*processed));
}
return;
}
if (size != (int64_t)processed->second->size_) {
// if we are processing a specific size, and the size of the block
// doesn't match, we can't use it.
{
std::lock_guard<std::mutex> g(events_mutex_);
events_.push_front(std::move(*processed));
}
continue;
}
}
// otherwise, query the event
{
// now, see if we can handle this element
@ -286,9 +354,14 @@ struct CachingHostAllocatorImpl {
// push the event onto the back if it's not ready.
{
std::lock_guard<std::mutex> g(events_mutex_);
events_.push_back(std::move(*processed));
if (size == -1) {
events_.push_back(std::move(*processed));
return;
} else {
events_.push_front(std::move(*processed));
continue;
}
}
return;
}
}
@ -309,46 +382,54 @@ struct CachingHostAllocatorImpl {
auto index = size_index(block->size_);
std::lock_guard<std::mutex> g(free_list_[index].mutex_);
free_list_[index].list_.push_back(block);
if (size != -1) {
return;
}
}
}
}
/* These following functions are runtime-related. */
// Allocate page-locked memory on the host.
virtual void allocate_host_memory(size_t size, void** ptr) {
TORCH_CHECK_NOT_IMPLEMENTED(
false, "Not implemented for allocate_host_memory");
TaskThreadPool* getBackgroundThreadPool() {
static TaskThreadPool* pool = new TaskThreadPool(1);
return pool;
}
// Free block and release the pointer contained in block.
virtual void free_block(B* block) {
TORCH_CHECK_NOT_IMPLEMENTED(false, "Not implemented for free_block");
}
/* These following functions are runtime-related. */
// Record an event on stream and store event into events.
virtual void record_stream(std::optional<std::vector<E>>& events, S stream) {
TORCH_CHECK_NOT_IMPLEMENTED(false, "Not implemented for record_stream");
}
// Allocate page-locked memory on the host.
virtual void allocate_host_memory(size_t size, void** ptr) {
TORCH_CHECK_NOT_IMPLEMENTED(
false, "Not implemented for allocate_host_memory");
}
// Query event if it is completed.
virtual bool query_event(E& event) {
TORCH_CHECK_NOT_IMPLEMENTED(false, "Not implemented for query_event");
}
// Free block and release the pointer contained in block.
virtual void free_block(B* block) {
TORCH_CHECK_NOT_IMPLEMENTED(false, "Not implemented for free_block");
}
alignas(64) std::mutex blocks_mutex_;
ska::flat_hash_set<B*> blocks_; // block list
ska::flat_hash_map<void*, B*> ptr_to_block_;
// Record an event on stream and store event into events.
virtual void record_stream(std::optional<std::vector<E>>& events, S stream) {
TORCH_CHECK_NOT_IMPLEMENTED(false, "Not implemented for record_stream");
}
// We keep free list as a vector of free lists, one for each power of two
// size. This allows us to quickly find a free block of the right size.
// We use deque to store per size free list and guard the list with its own
// mutex.
alignas(64) std::vector<FreeBlockList<B>> free_list_ = std::vector<FreeBlockList<B>>(MAX_SIZE_INDEX);
// Query event if it is completed.
virtual bool query_event(E& event) {
TORCH_CHECK_NOT_IMPLEMENTED(false, "Not implemented for query_event");
}
alignas(64) std::mutex events_mutex_;
std::deque<std::pair<E, B*>> events_; // event queue paired with block
};
alignas(64) std::mutex blocks_mutex_;
ska::flat_hash_set<B*> blocks_; // block list
ska::flat_hash_map<void*, B*> ptr_to_block_;
// We keep free list as a vector of free lists, one for each power of two
// size. This allows us to quickly find a free block of the right size.
// We use deque to store per size free list and guard the list with its own
// mutex.
alignas(64) std::vector<FreeBlockList<B>> free_list_ = std::vector<FreeBlockList<B>>(MAX_SIZE_INDEX);
alignas(64) std::mutex events_mutex_;
std::deque<std::pair<E, B*>> events_; // event queue paired with block
};
template <typename T>
struct CachingHostAllocatorInterface : public at::Allocator {

View File

@ -123,6 +123,11 @@ struct CUDACachingHostAllocatorImpl
return true;
}
bool pinned_use_background_threads() override {
return c10::cuda::CUDACachingAllocator::CUDAAllocatorConfig::
pinned_use_background_threads();
}
EventPool::Event create_event_internal(DeviceIndex idx) {
// Leak the event pool to avoid shutdown issue.
static auto* event_pool = new EventPool();

View File

@ -18,6 +18,7 @@ CUDAAllocatorConfig::CUDAAllocatorConfig()
m_expandable_segments(false),
m_release_lock_on_cudamalloc(false),
m_pinned_use_cuda_host_register(false),
m_pinned_use_background_threads(false),
m_last_allocator_settings("") {
m_roundup_power2_divisions.assign(kRoundUpPowerOfTwoIntervals, 0);
}
@ -331,6 +332,9 @@ void CUDAAllocatorConfig::parseArgs(const char* env) {
} else if (config_item_view == "pinned_num_register_threads") {
i = parsePinnedNumRegisterThreads(config, i);
used_native_specific_option = true;
} else if (config_item_view == "pinned_use_background_threads") {
i = parsePinnedUseBackgroundThreads(config, i);
used_native_specific_option = true;
} else {
TORCH_CHECK(
false, "Unrecognized CachingAllocator option: ", config_item_view);
@ -388,6 +392,22 @@ size_t CUDAAllocatorConfig::parsePinnedNumRegisterThreads(
return i;
}
size_t CUDAAllocatorConfig::parsePinnedUseBackgroundThreads(
const std::vector<std::string>& config,
size_t i) {
consumeToken(config, ++i, ':');
if (++i < config.size()) {
TORCH_CHECK(
(config[i] == "True" || config[i] == "False"),
"Expected a single True/False argument for pinned_use_background_threads");
m_pinned_use_background_threads = (config[i] == "True");
} else {
TORCH_CHECK(
false, "Error, expecting pinned_use_background_threads value", "");
}
return i;
}
// General caching allocator utilities
void setAllocatorSettings(const std::string& env) {
CUDACachingAllocator::CUDAAllocatorConfig::instance().parseArgs(env.c_str());

View File

@ -46,6 +46,10 @@ class C10_CUDA_API CUDAAllocatorConfig {
return instance().m_pinned_num_register_threads;
}
static bool pinned_use_background_threads() {
return instance().m_pinned_use_background_threads;
}
static size_t pinned_max_register_threads() {
// Based on the benchmark results, we see better allocation performance
// with 8 threads. However on future systems, we may need more threads
@ -113,6 +117,9 @@ class C10_CUDA_API CUDAAllocatorConfig {
size_t parsePinnedNumRegisterThreads(
const std::vector<std::string>& config,
size_t i);
size_t parsePinnedUseBackgroundThreads(
const std::vector<std::string>& config,
size_t i);
std::atomic<size_t> m_max_split_size;
std::atomic<size_t> m_max_non_split_rounding_size;
@ -122,6 +129,7 @@ class C10_CUDA_API CUDAAllocatorConfig {
std::atomic<bool> m_expandable_segments;
std::atomic<bool> m_release_lock_on_cudamalloc;
std::atomic<bool> m_pinned_use_cuda_host_register;
std::atomic<bool> m_pinned_use_background_threads;
std::string m_last_allocator_settings;
std::mutex m_last_allocator_settings_mutex;
};

View File

@ -534,6 +534,10 @@ Available options:
allocation time of pinned memory. A good value for this option is 8 based on
benchmarking results.
`pinned_use_background_threads` option is a boolean flag to enable background thread
for processing events. This avoids any slow path associated with querying/processing of
events in the fast allocation path. This feature is disabled by default.
.. note::
Some stats reported by the