[c10d][TCPStore] make TCPStore server use libuv by default (#127957)

**Summary**
This PR switches the default TCPStore server backend to a new implementation that utilizes [`libuv`](https://github.com/libuv/libuv) for significantly lower initialization time and better scalability:
<img width="714" alt="image" src="https://github.com/pytorch/pytorch/assets/12968408/18503011-da5d-4104-8ba9-abc456438b02">

We hope this improvement would benefit users from a much shorter startup time in large-scale jobs. Eventually, we hope to fully replace the old TCPStore backend implementation with the libuv one.

**What it changes**
This PR changes the underlying TCPStore server backend to `libuv` if users don't explicitly specify to use the old TCPStore server. This change is not supposed to cause any user notice except significant faster TCPStore startup for large-scale jobs.

One thing to note is, we do not support the initialization approach where user passes in a socket for libuv backend. We plan to support it as a next step but we choose to disable it before fully testing. If you are initializing TCPStore in this approach, you can see the next section to remain using the old TCPStore server.

**Fallback/Remain using the old TCPStore server**
For users who want to stay with the old TCPStore backend, there're 3 ways:

1. If user is directly instantiating TCPStore object, user can pass in argument `use_libuv=False` to use the old TCPStore server backend e.g. `store = torch.distributed.TCPStore(..., use_libuv=False)`.
2. Or, specify the TCPStore backend option in `init_method` when calling default ProcessGroup init, e.g. `torch.distributed.init_process_group(..., init_method="{YOUR_RENDEZVOUS_METHOD}://{YOUR_HOSTNAME}:{YOUR_PORT}?use_libuv=0")`
3. Or, user can set environment variable `USE_LIBUV` to `"0"` when launching.

These 3 approach are in order of precedence. That being said, if user specifies `use_libuv=0` in `init_method` and also sets environment var `USE_LIBUV="1"`, the former will take effect and the TCPStore backend instantiated will be the old one instead of the one using libuv.

**Operating Systems Compatibility**
From the CI signals, we believe the new implementation has the same behavior as the old TCPStore server on all supported platforms. If you notice any behavior discrepancy, please file an issue with `oncall: distributed` label.

**Test Plan**
`pytest test/distributed/test_store.py`
<img width="2548" alt="image" src="https://github.com/pytorch/pytorch/assets/12968408/dc0aebeb-6d5a-4daa-b98c-e56bd39aa588">
note: `TestMultiThreadedWait::test_wait` is a broken test that has been there for some time.

`test/distributed/elastic/utils/distributed_test.py`
<img width="2558" alt="image" src="https://github.com/pytorch/pytorch/assets/12968408/a6a3266d-b798-41c4-94d2-152056a034f6">

**TODO**
1. Update the doc at

- https://pytorch.org/docs/stable/distributed.html#distributed-key-value-store
- https://pytorch.org/docs/stable/distributed.html#tcp-initialization

2. Make torch elastic rendezvous to use libuv TCPStore as well. See `torch/distributed/elastic/rendezvous/c10d_rendezvous_backend.py` cc @mrshenli @pritamdamania87 @zhaojuanmao @satgera @gqchen @aazzolini @osalpekar @jiayisuse @H-Huang @kwen2501 @awgu @penguinwu @fegin @wanchaol @fduwjj @wz337 @tianyu-l @wconstab @yf225 @chauhang @d4l3k @kurman
3. Test if libuv backend is okay with initialization with socket. Change `LibUvTCPStoreTest::test_take_over_listen_socket`.

**Test Plan**
`pytest test/distributed/test_store.py`
<img width="2548" alt="image" src="https://github.com/pytorch/pytorch/assets/12968408/dc0aebeb-6d5a-4daa-b98c-e56bd39aa588">
note: `TestMultiThreadedWait::test_wait` is a broken test that has been there for some time.

`test/distributed/elastic/utils/distributed_test.py`
<img width="2558" alt="image" src="https://github.com/pytorch/pytorch/assets/12968408/a6a3266d-b798-41c4-94d2-152056a034f6">

Differential Revision: [D58259591](https://our.internmc.facebook.com/intern/diff/D58259591)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/127957
Approved by: https://github.com/kurman
ghstack dependencies: #127956
This commit is contained in:
Xilun Wu
2024-06-06 22:59:24 -07:00
committed by PyTorch MergeBot
parent 6c824cd9fb
commit 85758fa5ae
7 changed files with 123 additions and 23 deletions

View File

@ -265,13 +265,17 @@ class PrefixFileStoreTest(TestCase, StoreTestBase):
class TCPStoreTest(TestCase, StoreTestBase):
_use_libuv = False
def _create_store(self):
store = create_tcp_store()
store = create_tcp_store(use_libuv=self._use_libuv)
store.set_timeout(timedelta(seconds=300))
return store
def _create_store_with_ws(self, addr, world_size):
return create_tcp_store(addr, world_size, wait_for_workers=False)
return create_tcp_store(
addr, world_size, wait_for_workers=False, use_libuv=self._use_libuv
)
def test_address_already_in_use(self):
err_msg_reg = "^The server socket has failed to listen on any local "
@ -282,8 +286,14 @@ class TCPStoreTest(TestCase, StoreTestBase):
# Use noqa to silence flake8.
# Need to store in an unused variable here to ensure the first
# object is not destroyed before the second object is created.
store1 = dist.TCPStore(addr, port, 1, True) # noqa: F841
store2 = dist.TCPStore(addr, port, 1, True) # noqa: F841
store1 = dist.TCPStore(
addr, port, 1, True, use_libuv=self._use_libuv
) # noqa: F841
store2 = dist.TCPStore(
addr, port, 1, True, use_libuv=self._use_libuv
) # noqa: F841
self.assertEqual(store1.libuvBackend, self._use_libuv)
self.assertEqual(store2.libuvBackend, self._use_libuv)
@retry_on_connect_failures
def test_multitenancy(self):
@ -293,8 +303,14 @@ class TCPStoreTest(TestCase, StoreTestBase):
# Use noqa to silence flake8.
# Need to store in an unused variable here to ensure the first
# object is not destroyed before the second object is created.
store1 = dist.TCPStore(addr, port, 1, True, multi_tenant=True) # type: ignore[call-arg] # noqa: F841
store2 = dist.TCPStore(addr, port, 1, True, multi_tenant=True) # type: ignore[call-arg] # noqa: F841
store1 = dist.TCPStore(
addr, port, 1, True, multi_tenant=True, use_libuv=self._use_libuv
) # type: ignore[call-arg] # noqa: F841
store2 = dist.TCPStore(
addr, port, 1, True, multi_tenant=True, use_libuv=self._use_libuv
) # type: ignore[call-arg] # noqa: F841
self.assertEqual(store1.libuvBackend, self._use_libuv)
self.assertEqual(store2.libuvBackend, self._use_libuv)
@skip_if_win32()
@retry_on_connect_failures
@ -308,6 +324,7 @@ class TCPStoreTest(TestCase, StoreTestBase):
# We internally use a multi-tenant TCP store. Both PG and RPC should successfully
# initialize even when using the same socket address.
os.environ["USE_LIBUV"] = "1" if self._use_libuv else "0"
dist.init_process_group(
backend="gloo",
init_method="env://",
@ -325,6 +342,8 @@ class TCPStoreTest(TestCase, StoreTestBase):
rpc_backend_options=backend_opts,
)
del os.environ["USE_LIBUV"]
assert "USE_LIBUV" not in os.environ
rpc.shutdown()
dist.destroy_process_group()
@ -335,8 +354,16 @@ class TCPStoreTest(TestCase, StoreTestBase):
addr, port, *_ = listen_sock.getsockname()
listen_fd = listen_sock.detach()
store = dist.TCPStore(addr, port, 1, is_master=True, master_listen_fd=listen_fd)
store = dist.TCPStore(
addr,
port,
1,
is_master=True,
master_listen_fd=listen_fd,
use_libuv=self._use_libuv,
)
self.assertEqual(store.libuvBackend, self._use_libuv)
store.set("key", "value")
self.assertEqual(b"value", store.get("key"))
@ -374,7 +401,11 @@ class TCPStoreTest(TestCase, StoreTestBase):
def _create_client(self, index, addr, port, world_size):
client_store = dist.TCPStore(
addr, port, world_size=world_size, timeout=timedelta(seconds=10)
addr,
port,
world_size=world_size,
timeout=timedelta(seconds=10),
use_libuv=self._use_libuv,
)
self.assertEqual(b"value", client_store.get("key"))
client_store.set(f"new_key{index}", f"new_value{index}")
@ -388,6 +419,7 @@ class TCPStoreTest(TestCase, StoreTestBase):
def _multi_worker_helper(self, world_size):
addr = DEFAULT_HOSTNAME
server_store = self._create_store_with_ws(addr, world_size)
self.assertEqual(server_store.libuvBackend, self._use_libuv)
server_store.set("key", "value")
port = server_store.port
@ -403,6 +435,7 @@ class TCPStoreTest(TestCase, StoreTestBase):
def test_append(self):
store = self._create_store()
self.assertEqual(store.libuvBackend, self._use_libuv)
store.set("foo", "po")
store.append("foo", "tato")
store.append("bar", "po")
@ -412,12 +445,14 @@ class TCPStoreTest(TestCase, StoreTestBase):
def test_multi_set(self):
store = self._create_store()
self.assertEqual(store.libuvBackend, self._use_libuv)
store.multi_set(["foo", "bar"], ["po", "tato"])
self.assertEqual(b"po", store.get("foo"))
self.assertEqual(b"tato", store.get("bar"))
def test_multi_get(self):
store = self._create_store()
self.assertEqual(store.libuvBackend, self._use_libuv)
store.set("foo", "po")
store.set("bar", "tato")
v0, v1 = store.multi_get(["foo", "bar"])
@ -430,7 +465,14 @@ class TCPStoreTest(TestCase, StoreTestBase):
r"Timed out after \d+ seconds waiting for clients. \d+/\d+ clients joined.",
):
# world_size is 2 so it should timeout
dist.TCPStore("localhost", 0, 2, True, timeout=timedelta(seconds=2))
dist.TCPStore(
"localhost",
0,
2,
True,
timeout=timedelta(seconds=2),
use_libuv=self._use_libuv,
)
# when wait_for_workers is not set, then there should be no exception raised
dist.TCPStore(
@ -440,10 +482,13 @@ class TCPStoreTest(TestCase, StoreTestBase):
True,
timeout=timedelta(seconds=2),
wait_for_workers=False,
use_libuv=self._use_libuv,
)
class LibUvTCPStoreTest(TCPStoreTest):
_use_libuv = True
def _create_store(self):
store = create_tcp_store(use_libuv=True)
store.set_timeout(timedelta(seconds=300))
@ -454,6 +499,33 @@ class LibUvTCPStoreTest(TCPStoreTest):
addr, world_size, wait_for_workers=False, use_libuv=True
)
def test_take_over_listen_socket(self):
"""
override the take_over_listen_socket test in TCPStoreTest.
Reason: we have not thoroughly tested libuv TCPStore initialization using
open Socket so we decide to not support this use for now.
TODO (xilunwu): enable this use case
"""
listen_sock: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listen_sock.bind(("localhost", 0))
addr, port, *_ = listen_sock.getsockname()
listen_fd = listen_sock.detach()
err_msg_reg = (
"^The libuv TCPStore backend does not support "
"initialization with an listen fd"
)
with self.assertRaisesRegex(NotImplementedError, err_msg_reg):
store = dist.TCPStore(
addr,
port,
1,
is_master=True,
master_listen_fd=listen_fd,
use_libuv=self._use_libuv,
)
class PrefixTCPStoreTest(TestCase, StoreTestBase):
def setUp(self):
@ -769,7 +841,7 @@ class TestPythonStore(TestCase):
class TestMultiThreadedWait(MultiThreadedTestCase):
# TODO: Use less hacky means of instantiating stores.
# TODO (xilunwu): Use less hacky means of instantiating stores.
# Note, stores accumulate values per test.
stores = [
dist.FileStore(tempfile.NamedTemporaryFile(delete=False).name, 1),
@ -777,9 +849,9 @@ class TestMultiThreadedWait(MultiThreadedTestCase):
dist.PrefixStore(
"pre", dist.FileStore(tempfile.NamedTemporaryFile(delete=False).name, 1)
),
create_tcp_store(),
create_tcp_store(use_libuv=False),
create_tcp_store(use_libuv=True),
dist.PrefixStore("pre", create_tcp_store()),
dist.PrefixStore("pre", create_tcp_store(use_libuv=False)),
dist.PrefixStore("pre", create_tcp_store(use_libuv=True)),
]
@ -872,7 +944,12 @@ class TimeoutTest(TestCase):
self.assertTrue(rank_res[1], "rank1")
class InitPgWithUvStore(TestCase):
class InitPgWithNonUvStore(TestCase):
"""
This test shows how to use the legacy TCPStore (non-libuv) backend since libuv is now
the default backend.
"""
def tearDown(self):
super().tearDown()
os.environ.pop("USE_LIBUV", None)
@ -885,13 +962,13 @@ class InitPgWithUvStore(TestCase):
"gloo",
rank=0,
world_size=1,
init_method=f"tcp://{DEFAULT_HOSTNAME}:{port}?use_libuv=1",
init_method=f"tcp://{DEFAULT_HOSTNAME}:{port}?use_libuv=0",
)
self._run_test()
def test_with_env_var(self):
port = common.find_free_port()
os.environ["USE_LIBUV"] = "1"
os.environ["USE_LIBUV"] = "0"
os.environ["MASTER_ADDR"] = DEFAULT_HOSTNAME
os.environ["MASTER_PORT"] = str(port)
dist.init_process_group("gloo", rank=0, world_size=1, init_method="env://")
@ -905,7 +982,7 @@ class InitPgWithUvStore(TestCase):
while isinstance(store, dist.PrefixStore):
store = store.underlying_store
self.assertTrue(isinstance(store, dist.TCPStore))
self.assertTrue(store.libuvBackend)
self.assertFalse(store.libuvBackend)
dist.destroy_process_group()

View File

@ -291,6 +291,17 @@ TCPStore::TCPStore(std::string host, const TCPStoreOptions& opts)
TORCH_CHECK(
::c10d::detail::is_libuv_tcpstore_backend_available(),
"use_libuv was requested but PyTorch was build without libuv support");
if (opts.masterListenFd.has_value()) {
// TODO(xilunwu): support this init method after testing
constexpr auto* msg =
"The libuv TCPStore backend does not support initialization with an listen fd. "
"Please switch to the legacy TCPStore by setting environment variable USE_LIBUV "
"to \"0\".";
C10D_ERROR(msg);
C10_THROW_ERROR(NotImplementedError, msg);
return;
}
}
Socket::initialize();

View File

@ -63,7 +63,7 @@ struct TCPStoreOptions {
std::optional<int> masterListenFd = c10::nullopt;
// A boolean value indicating whether to use the experimental libUV backend.
bool useLibUV = false;
bool useLibUV = true;
};
class TORCH_API TCPStore : public Store {
@ -158,7 +158,7 @@ class TORCH_API TCPStore : public Store {
const std::string keyPrefix_ = "/";
std::mutex activeOpLock_;
std::unordered_map<std::string, detail::Counter> clientCounters_;
bool usingLibUv_ = false;
bool usingLibUv_ = true;
};
} // namespace c10d

View File

@ -2,6 +2,7 @@
#include <deque>
#include <exception>
#include <memory>
#include <ostream>
#include <unordered_map>
#include <unordered_set>
#include <utility>

View File

@ -1391,6 +1391,7 @@ Arguments:
wait_for_workers (bool, optional): Whether to wait for all the workers to connect with the server store. This is only applicable when world_size is a fixed value. Default is True.
multi_tenant (bool, optional): If True, all ``TCPStore`` instances in the current process with the same host/port will use the same underlying ``TCPServer``. Default is False.
master_listen_fd (int, optional): If specified, the underlying ``TCPServer`` will listen on this file descriptor, which must be a socket already bound to ``port``. Useful to avoid port assignment races in some scenarios. Default is None (meaning the server creates a new socket and attempts to bind it to ``port``).
use_libuv (bool, optional): If True, use libuv for ``TCPServer`` backend. Default is True.
Example::
>>> import torch.distributed as dist
>>> from datetime import timedelta
@ -1440,7 +1441,7 @@ Example::
py::arg("wait_for_workers") = true,
py::arg("multi_tenant") = false,
py::arg("master_listen_fd") = py::none(),
py::arg("use_libuv") = false,
py::arg("use_libuv") = true,
py::call_guard<py::gil_scoped_release>())
.def(
"collect_client_counters",

View File

@ -58,6 +58,12 @@ def _query_to_dict(query: str) -> Dict[str, str]:
return {pair[0]: pair[1] for pair in (pair.split("=") for pair in filter(None, query.split("&")))}
def _get_use_libuv_from_query_dict(query_dict: Dict[str, str]) -> bool:
# libuv is the default backend for TCPStore. To enable the non-libuv backend,
# user can explicitly specify ``use_libuv=0`` in the URL parameter.
return query_dict.get("use_libuv", os.environ.get("USE_LIBUV", "1")) == "1"
def _rendezvous_helper(url: str, rank: int, world_size_opt: Optional[int], **kwargs):
result = urlparse(url)
if world_size_opt is None:
@ -145,13 +151,16 @@ def _torchelastic_use_agent_store() -> bool:
return os.environ.get("TORCHELASTIC_USE_AGENT_STORE", None) == str(True)
def _create_c10d_store(hostname, port, rank, world_size, timeout, use_libuv=False) -> Store:
def _create_c10d_store(hostname, port, rank, world_size, timeout, use_libuv=True) -> Store:
"""
Smartly creates a c10d Store object on ``rank`` based on whether we need to re-use agent store.
The TCPStore server is assumed to be hosted
on ``hostname:port``.
By default, the TCPStore server uses the asynchronous implementation
``LibUVStoreDaemon`` which utilizes libuv.
If ``torchelastic_use_agent_store()`` is ``True``, then it is assumed that
the agent leader (node rank 0) hosts the TCPStore server (for which the
endpoint is specified by the given ``hostname:port``). Hence
@ -194,7 +203,8 @@ def _tcp_rendezvous_handler(
rank = int(query_dict["rank"])
world_size = int(query_dict["world_size"])
use_libuv = query_dict.get("use_libuv", "0") == "1"
use_libuv = _get_use_libuv_from_query_dict(query_dict)
assert result.hostname is not None
store = _create_c10d_store(result.hostname, result.port, rank, world_size, timeout, use_libuv)
@ -242,7 +252,7 @@ def _env_rendezvous_handler(
master_addr = _get_env_or_raise("MASTER_ADDR")
master_port = int(_get_env_or_raise("MASTER_PORT"))
use_libuv = query_dict.get("use_libuv", os.environ.get("USE_LIBUV", "0")) == "1"
use_libuv = _get_use_libuv_from_query_dict(query_dict)
store = _create_c10d_store(master_addr, master_port, rank, world_size, timeout, use_libuv)

View File

@ -357,7 +357,7 @@ def create_tcp_store(
timeout=timedelta(minutes=5),
wait_for_workers=True,
jit_class=False,
use_libuv=False
use_libuv=True,
):
"""
Creates a TCP store. Retries if the chosen port is already in use.