mirror of
https://github.com/pytorch/pytorch.git
synced 2025-10-20 21:14:14 +08:00
This adds a non-blocking mode to queue_pop. This allows for workers to poll if work is ready without blocking the main loop. This is useful for the case where you want to have a GPU have maximum utilization when something only periodically is sent on the queue. We also expose a `torch.distributed.QueueEmptyError` so users can catch the error and handle it accordingly. Test plan: ``` pytest test/distributed/test_store.py -k queue -v -s -x ``` Pull Request resolved: https://github.com/pytorch/pytorch/pull/151485 Approved by: https://github.com/fduwjj, https://github.com/tianfengfrank
76 lines
2.2 KiB
C++
76 lines
2.2 KiB
C++
#pragma once
|
|
|
|
#include <torch/csrc/distributed/c10d/Store.hpp>
|
|
|
|
namespace c10d {
|
|
|
|
class TORCH_API PrefixStore : public Store {
|
|
public:
|
|
explicit PrefixStore(std::string prefix, c10::intrusive_ptr<Store> store);
|
|
|
|
c10::intrusive_ptr<Store> clone() override;
|
|
|
|
using Store::set;
|
|
void set(const std::string& key, const std::vector<uint8_t>& value) override;
|
|
|
|
using Store::compareSet;
|
|
std::vector<uint8_t> compareSet(
|
|
const std::string& key,
|
|
const std::vector<uint8_t>& expectedValue,
|
|
const std::vector<uint8_t>& desiredValue) override;
|
|
|
|
std::vector<uint8_t> get(const std::string& key) override;
|
|
|
|
int64_t add(const std::string& key, int64_t value) override;
|
|
|
|
bool deleteKey(const std::string& key) override;
|
|
|
|
int64_t getNumKeys() override;
|
|
|
|
bool check(const std::vector<std::string>& keys) override;
|
|
|
|
void wait(const std::vector<std::string>& keys) override;
|
|
|
|
void wait(
|
|
const std::vector<std::string>& keys,
|
|
const std::chrono::milliseconds& timeout) override;
|
|
|
|
const std::chrono::milliseconds& getTimeout() const noexcept override;
|
|
|
|
void setTimeout(const std::chrono::milliseconds& timeout) override;
|
|
|
|
void append(const std::string& key, const std::vector<uint8_t>& value)
|
|
override;
|
|
|
|
std::vector<std::vector<uint8_t>> multiGet(
|
|
const std::vector<std::string>& keys) override;
|
|
|
|
void multiSet(
|
|
const std::vector<std::string>& keys,
|
|
const std::vector<std::vector<uint8_t>>& values) override;
|
|
|
|
// Returns true if this store support append, multiGet and multiSet
|
|
bool hasExtendedApi() const override;
|
|
|
|
void queuePush(const std::string& key, const std::vector<uint8_t>& value)
|
|
override;
|
|
|
|
std::vector<uint8_t> queuePop(const std::string& key, bool block) override;
|
|
|
|
int64_t queueLen(const std::string& key) override;
|
|
|
|
c10::intrusive_ptr<Store> getUnderlyingStore();
|
|
|
|
// Recursively to fetch the store before layers of wrapping with PrefixStore.
|
|
c10::intrusive_ptr<Store> getUnderlyingNonPrefixStore();
|
|
|
|
protected:
|
|
std::string prefix_;
|
|
c10::intrusive_ptr<Store> store_;
|
|
|
|
std::string joinKey(const std::string& key);
|
|
std::vector<std::string> joinKeys(const std::vector<std::string>& keys);
|
|
};
|
|
|
|
} // namespace c10d
|