Fixing RPC Shutdown and Thread Joining (#36239)

Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/36239

ProcessGroupAgent and ThriftAgent threads were joined at shutdown, but RpcAgent threads were joined by the destructor. This PR joins all threads at shutdown by using a pattern similar to `start` in RPC.

The derived classes implement a `shutdownImpl` class that cleans up backend-specific state. RpcAgent implements `shutdown` which cleans up generic state and calls the underlying `shutdownImpl`. The atomic running is now set and unset by RpcAgent so backends do not need to mutate it.
ghstack-source-id: 101820415

Test Plan: Ensured this works with `test_duplicate_name` (in which RpcAgent is constructed but PGA is not), and selected `rpc_spawn` and `dist_autograd_spawn` tests with TSAN. Checking Build Bot and CI as well, and continuing to test more with TSAN on devserver (currently running into memory issues).

Reviewed By: jjlilley

Differential Revision: D20902666

fbshipit-source-id: 5dbb5fc92ba66f75614c050bb10b10810770ab12
This commit is contained in:
Omkar Salpekar
2020-04-09 12:01:57 -07:00
committed by Facebook GitHub Bot
parent 9497b21e63
commit 264da24c9e
4 changed files with 27 additions and 22 deletions

View File

@ -23,7 +23,9 @@ RpcAgent::RpcAgent(
rpcAgentRunning_(false) {}
RpcAgent::~RpcAgent() {
cleanup();
if (rpcAgentRunning_.load()) {
shutdown();
}
}
void RpcAgent::start() {
@ -32,14 +34,15 @@ void RpcAgent::start() {
startImpl();
}
void RpcAgent::cleanup() {
void RpcAgent::shutdown() {
std::unique_lock<std::mutex> lock(rpcRetryMutex_);
rpcAgentRunning_.store(false);
// We must notify the condition variable so it stops waiting in the
// retry thread, otherwise this thread cannot be joined.
lock.unlock();
rpcRetryMapCV_.notify_one();
if (rpcRetryThread_.joinable()) {
rpcRetryThread_.join();
}
shutdownImpl();
}
std::shared_ptr<FutureMessage> RpcAgent::sendWithRetries(