[RPC] Add to confirmed users immediately if the fork is shared from owner, instead of adding nothing to pending users (#34988)

Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/34988

In https://github.com/pytorch/pytorch/pull/31893, we introduced a confirmedUsers_ map in RRefContext.

For the case that the fork is shared from the owner,  there is no `pendingUsers_` intermediate phase for this fork, we should put this fork into `confirmedUsers_` immediately.

Test Plan:
```
buck test mode/dev-nosan //caffe2/test/distributed/rpc:rpc_fork
```

```
buck test mode/dev-nosan //caffe2/test/distributed/rpc/jit:rpc_fork
```

Differential Revision: D7735909

fbshipit-source-id: 14c36a16486f0cc9618dcfb111fe5223781b647d
This commit is contained in:
Shihao Xu
2020-03-18 18:13:42 -07:00
committed by Facebook GitHub Bot
parent b8e043abca
commit e5ee95e448
2 changed files with 19 additions and 4 deletions

View File

@ -344,6 +344,7 @@ void RRefContext::notifyOwnerAndParentOfFork(
const ForkId& forkId,
worker_id_t parent,
const c10::intrusive_ptr<RRef>& rref) {
// Fork is shared from owner.
if (parent == rref->owner()) {
if (parent == agent_->getWorkerInfo().id_) {
// Owner sending RRef to self, remove the forkId as it was added during
@ -365,10 +366,13 @@ void RRefContext::notifyOwnerAndParentOfFork(
// Hence, it is not necessary to send another RREF_CHILD_ACCEPT or
// RREF_FORK_REQUEST back to the owner. See Note [Early Fork
// Registration].
std::lock_guard<std::mutex> lock(mutex_);
addConfirmedUser(forkId, rref);
}
return;
}
// Fork is shared from user.
if (rref->isOwner()) {
// See Note [Useful Phantom Fork ID for User to Owner Call]
// In this case, the owner is the caller, and it does not add the fork id
@ -494,10 +498,7 @@ void RRefContext::delPendingUser(const ForkId& forkId) {
// hiding the subtle logic using a reentrant lock.
deletedState = iter->second; // Increase refcount
confirmedUsers_.emplace(
std::piecewise_construct,
std::forward_as_tuple(forkId),
std::forward_as_tuple(iter->second->rref_));
addConfirmedUser(forkId, iter->second->rref_);
pendingUsers_.erase(iter); // Decrease refcount.
}
deletedState->confirm();
@ -505,6 +506,17 @@ void RRefContext::delPendingUser(const ForkId& forkId) {
deletedState.reset(); // Decrease refcount.
}
void RRefContext::addConfirmedUser(
const ForkId& forkId,
const c10::intrusive_ptr<RRef>& rref) {
// Notice, caller need to hold the mutex for confirmedUsers_.
// std::lock_guard<std::mutex> lock(mutex_);
confirmedUsers_.emplace(
std::piecewise_construct,
std::forward_as_tuple(forkId),
std::forward_as_tuple(rref));
}
void RRefContext::recordThreadLocalPendingRRefs() {
TORCH_INTERNAL_ASSERT(
userTable_.empty(),