mirror of
https://github.com/pytorch/pytorch.git
synced 2025-10-20 21:14:14 +08:00
This adds queue operations as described in https://github.com/pytorch/pytorch/issues/150943. This works by adding two new operations `queue_push` and `queue_pop`. The semantics are designed to be blocking with a timeout. Pushing will always succeed as the queue is infinite size. Popping will first call `wait` until the key is ready and then pop the value from the queue. This implements queues for only: HashStore, TCPStore w/ libuv. FileStore and the legacy backends are not supported. `wait` and `check` work for queue operations though queue_push will only wake up the first waiter rather than all of them. This also has a few cleanups to error types/documentation in related code. Example trace: ``` [I409 16:51:43.963833529 TCPStoreLibUvBackend.cpp:829] [c10d - trace] validate magic:1015412686 address:[localhost]:55816 [I409 16:51:43.963845838 TCPStoreLibUvBackend.cpp:842] [c10d - trace] ping nonce:2840795 address:[localhost]:55816 [I409 16:51:43.963902914 TCPStoreLibUvBackend.cpp:911] [c10d - trace] add key:init/ val:1 address:[localhost]:55816 [I409 16:51:43.963939389 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:init/ address:[localhost]:55816 [I409 16:51:43.963974842 TCPStoreLibUvBackend.cpp:893] [c10d - trace] get key:init/ address:[localhost]:55816 [I409 16:51:43.964071909 TCPStoreLibUvBackend.cpp:1121] [c10d - trace] queue_push key:/test_prefix/test_queue_support address:[localhost]:55816 [I409 16:51:43.964080221 TCPStoreLibUvBackend.cpp:940] [c10d - trace] check key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816 [I409 16:51:43.964108584 TCPStoreLibUvBackend.cpp:1121] [c10d - trace] queue_push key:/test_prefix/foo address:[localhost]:55816 [I409 16:51:43.964123207 TCPStoreLibUvBackend.cpp:1121] [c10d - trace] queue_push key:/test_prefix/foo address:[localhost]:55816 [I409 16:51:43.964128194 TCPStoreLibUvBackend.cpp:940] [c10d - trace] check key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816 [I409 16:51:43.964156347 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816 [I409 16:51:43.964187493 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816 [I409 16:51:43.964217709 TCPStoreLibUvBackend.cpp:1133] [c10d - trace] queue_pop key:/test_prefix/foo address:[localhost]:55816 [I409 16:51:43.964324300 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816 [I409 16:51:43.964354495 TCPStoreLibUvBackend.cpp:1133] [c10d - trace] queue_pop key:/test_prefix/foo address:[localhost]:55816 [I409 16:51:43.964416299 TCPStoreLibUvBackend.cpp:940] [c10d - trace] check key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816 [I409 16:51:43.964458733 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/non_existant address:[localhost]:55816 [W409 16:51:43.974516585 socket.cpp:460] [c10d] waitForInput: poll for socket SocketImpl(fd=75, addr=[localhost]:55816, remote=[localhost]:46641) returned 0, likely a timeout [W409 16:51:43.974559169 socket.cpp:485] [c10d] waitForInput: socket SocketImpl(fd=75, addr=[localhost]:55816, remote=[localhost]:46641) timed out after 10ms [I409 16:51:43.974600451 TCPStoreLibUvBackend.cpp:1101] [c10d - trace] cancel_wait address:[localhost]:55816 ``` Test plan: ``` $ pytest test/distributed/test_store.py -k queue -v -s test/distributed/test_store.py::FileStoreTest::test_queues SKIPPED [0.4351s] (Store does not support queues) test/distributed/test_store.py::HashStoreTest::test_queues PASSED [0.0009s] test/distributed/test_store.py::PrefixFileStoreTest::test_queues SKIPPED [0.0006s] (Store does not support queues) test/distributed/test_store.py::TCPStoreTest::test_queues SKIPPED [0.0012s] (Store does not support queues) test/distributed/test_store.py::LibUvTCPStoreTest::test_queues PASSED [0.0014s] test/distributed/test_store.py::PrefixTCPStoreTest::test_queues PASSED [0.0014s] ``` Pull Request resolved: https://github.com/pytorch/pytorch/pull/150969 Approved by: https://github.com/XilunWu, https://github.com/fduwjj
170 lines
4.9 KiB
C++
170 lines
4.9 KiB
C++
#pragma once
|
|
|
|
#include <cstddef>
|
|
#include <cstdint>
|
|
#include <memory>
|
|
|
|
#include <torch/csrc/distributed/c10d/Store.hpp>
|
|
|
|
namespace c10d {
|
|
namespace detail {
|
|
|
|
// TCPStore is a key-value store used by PyTorch mainly for distributed
|
|
// rendezvous, but for other purposes as well. (e.g., a centralized storage for
|
|
// synchronization among different processes.)
|
|
//
|
|
// It is run via a classic client-server architecture, where the server runs
|
|
// a separate background thread (alternatively we call it daemon thread). The
|
|
// client and server communicate via TCP sockets.
|
|
//
|
|
// Currently we have two types of server backends:
|
|
// 1. TCPStoreBackend: a single thread to handle all incoming request
|
|
// synchronously.
|
|
// 2. LibUVTCPStoreBackend: an event-driven asynchronous stream processing that
|
|
// leverages libuv library (https://github.com/libuv/libuv) for better
|
|
// performance. And this backend now is recommended to users. (We set the
|
|
// default value of `useLibUV` inside `TCPStoreOptions` to true now, so users
|
|
// should get it by default).
|
|
//
|
|
// Code structure:
|
|
// ├── TCPStore client side API and server setup code:
|
|
// │ TCPStore.hpp/TCPStore.cpp
|
|
// ├── TCPStoreBackend server side API implementation code:
|
|
// │ TCPStoreBackend.hpp/TCPStoreBackend.cpp
|
|
// | (actual class:`TCPStoreMasterDaemon`)
|
|
// ├── LibUVTCPStoreBackend
|
|
// │ TCPStoreLibUvBackend.cpp
|
|
// | (actual class: `LibUVStoreDaemon`)
|
|
|
|
class TCPServer;
|
|
|
|
class TCPClient;
|
|
|
|
struct SocketAddress {
|
|
std::string host{};
|
|
std::uint16_t port{};
|
|
};
|
|
|
|
} // namespace detail
|
|
|
|
struct TCPStoreOptions {
|
|
static constexpr std::uint16_t kDefaultPort = 29500;
|
|
|
|
std::uint16_t port = kDefaultPort;
|
|
bool isServer = false;
|
|
std::optional<std::size_t> numWorkers = std::nullopt;
|
|
bool waitWorkers = true;
|
|
std::chrono::milliseconds timeout = Store::kDefaultTimeout;
|
|
|
|
// A boolean value indicating whether multiple store instances can be
|
|
// initialized with the same host:port pair.
|
|
bool multiTenant = false;
|
|
|
|
// If specified, and if isServer is true, the underlying TCPServer will take
|
|
// over the bound socket associated to this fd. This option is useful to avoid
|
|
// port assignment races in certain scenarios.
|
|
std::optional<int> masterListenFd = std::nullopt;
|
|
|
|
// A boolean value indicating whether to use the experimental libUV backend.
|
|
bool useLibUV = true;
|
|
};
|
|
|
|
class TORCH_API TCPStore : public Store {
|
|
public:
|
|
static constexpr std::chrono::milliseconds kConnectRetryDelay{1000};
|
|
|
|
explicit TCPStore(std::string host, const TCPStoreOptions& opts = {});
|
|
|
|
~TCPStore() override;
|
|
|
|
c10::intrusive_ptr<Store> clone() override;
|
|
|
|
void set(const std::string& key, const std::vector<uint8_t>& value) override;
|
|
|
|
std::vector<uint8_t> compareSet(
|
|
const std::string& key,
|
|
const std::vector<uint8_t>& expectedValue,
|
|
const std::vector<uint8_t>& desiredValue) override;
|
|
|
|
std::vector<uint8_t> get(const std::string& key) override;
|
|
|
|
int64_t add(const std::string& key, int64_t value) override;
|
|
|
|
bool deleteKey(const std::string& key) override;
|
|
|
|
bool check(const std::vector<std::string>& keys) override;
|
|
|
|
int64_t getNumKeys() override;
|
|
|
|
void wait(const std::vector<std::string>& keys) override;
|
|
|
|
void wait(
|
|
const std::vector<std::string>& keys,
|
|
const std::chrono::milliseconds& timeout) override;
|
|
|
|
void append(const std::string& key, const std::vector<uint8_t>& value)
|
|
override;
|
|
|
|
std::vector<std::vector<uint8_t>> multiGet(
|
|
const std::vector<std::string>& keys) override;
|
|
|
|
void multiSet(
|
|
const std::vector<std::string>& keys,
|
|
const std::vector<std::vector<uint8_t>>& values) override;
|
|
|
|
bool hasExtendedApi() const override;
|
|
|
|
void queuePush(const std::string& key, const std::vector<uint8_t>& value)
|
|
override;
|
|
|
|
std::vector<uint8_t> queuePop(const std::string& key) override;
|
|
|
|
int64_t queueLen(const std::string& key) override;
|
|
|
|
// Waits for all workers to join.
|
|
void waitForWorkers();
|
|
|
|
// Returns the hostname used by the TCPStore.
|
|
const std::string& getHost() const noexcept {
|
|
return addr_.host;
|
|
}
|
|
|
|
// Returns the port used by the TCPStore.
|
|
std::uint16_t getPort() const noexcept {
|
|
return addr_.port;
|
|
}
|
|
|
|
bool isLibUvBackend() const noexcept {
|
|
return usingLibUv_;
|
|
}
|
|
|
|
// note(xilunwu): this function is only for internal testing
|
|
void _splitSet(const std::string& key, const std::vector<uint8_t>& data);
|
|
|
|
std::string repr() const;
|
|
|
|
private:
|
|
int64_t incrementValueBy(const std::string& key, int64_t delta);
|
|
|
|
void ping();
|
|
void validate();
|
|
|
|
std::vector<uint8_t> doGet(const std::string& key);
|
|
|
|
void doWait(
|
|
c10::ArrayRef<std::string> keys,
|
|
std::chrono::milliseconds timeout);
|
|
|
|
detail::SocketAddress addr_;
|
|
std::shared_ptr<detail::TCPServer> server_;
|
|
std::unique_ptr<detail::TCPClient> client_;
|
|
std::optional<std::size_t> numWorkers_;
|
|
|
|
const std::string initKey_ = "init/";
|
|
const std::string keyPrefix_ = "/";
|
|
std::mutex activeOpLock_;
|
|
bool usingLibUv_ = true;
|
|
};
|
|
|
|
} // namespace c10d
|