Files
pytorch/torch/csrc/distributed/c10d/HashStore.hpp
Tristan Rice 98c892749b c10d/Store: add nonblocking mode to queue_pop (#151485)
This adds a non-blocking mode to queue_pop. This allows for workers to poll if work is ready without blocking the main loop. This is useful for the case where you want to have a GPU have maximum utilization when something only periodically is sent on the queue.

We also expose a `torch.distributed.QueueEmptyError` so users can catch the error and handle it accordingly.

Test plan:

```
pytest test/distributed/test_store.py -k queue -v -s -x
```

Pull Request resolved: https://github.com/pytorch/pytorch/pull/151485
Approved by: https://github.com/fduwjj, https://github.com/tianfengfrank
2025-04-18 02:14:50 +00:00

80 lines
2.1 KiB
C++

#pragma once
#include <condition_variable>
#include <mutex>
#include <unordered_map>
#include <torch/csrc/distributed/c10d/Store.hpp>
namespace c10d {
class TORCH_API HashStore : public Store {
public:
c10::intrusive_ptr<Store> clone() override;
~HashStore() override = default;
void set(const std::string& key, const std::vector<uint8_t>& data) override;
std::vector<uint8_t> compareSet(
const std::string& key,
const std::vector<uint8_t>& expectedValue,
const std::vector<uint8_t>& desiredValue) override;
std::vector<uint8_t> get(const std::string& key) override;
void wait(const std::vector<std::string>& keys) override {
wait(keys, timeout_);
}
void wait(
const std::vector<std::string>& keys,
const std::chrono::milliseconds& timeout) override;
int64_t add(const std::string& key, int64_t value) override;
int64_t getNumKeys() override;
bool check(const std::vector<std::string>& keys) override;
bool deleteKey(const std::string& key) override;
void append(const std::string& key, const std::vector<uint8_t>& value)
override;
std::vector<std::vector<uint8_t>> multiGet(
const std::vector<std::string>& keys) override;
void multiSet(
const std::vector<std::string>& keys,
const std::vector<std::vector<uint8_t>>& values) override;
// Returns true if this store support append, multiGet and multiSet
bool hasExtendedApi() const override;
void queuePush(const std::string& key, const std::vector<uint8_t>& value)
override;
std::vector<uint8_t> queuePop(const std::string& key, bool block) override;
int64_t queueLen(const std::string& key) override;
protected:
bool checkLocked(
const std::unique_lock<std::mutex>& lock,
const std::vector<std::string>& keys);
void waitLocked(
std::unique_lock<std::mutex>& lock,
const std::vector<std::string>& keys,
const std::chrono::milliseconds& timeout);
protected:
std::unordered_map<std::string, std::vector<uint8_t>> map_;
std::unordered_map<std::string, std::deque<std::vector<uint8_t>>> queues_;
std::mutex m_;
std::condition_variable cv_;
};
} // namespace c10d