Replace FutureMessage with ivalue::Future in RpcAgent retry logic (#49995)

Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/49995

Test Plan: Imported from OSS

Reviewed By: lw

Differential Revision: D25745301

Pulled By: mrshenli

fbshipit-source-id: b5e3a7e0b377496924847d8d70d61de32e2d87f4
This commit is contained in:
Shen Li
2021-01-07 19:43:44 -08:00
committed by Facebook GitHub Bot
parent 008206decc
commit d730c7e261
5 changed files with 53 additions and 53 deletions

View File

@ -112,13 +112,6 @@ void RRefContext::handleException(const JitFuture& jitFuture) {
}
}
void RRefContext::handleException(const FutureMessage& fm) {
if (fm.hasError()) {
VLOG(1) << "Got exception: " << fm.error()->what();
throw std::runtime_error(fm.error()->what());
}
}
RRefContext::RRefContext(std::shared_ptr<RpcAgent> agent)
: agent_(std::move(agent)), destroyed_(false) {}
@ -219,12 +212,13 @@ void RRefContext::delUser(
// which is now idempotent. See the comment at RRefContext::delForkOfOwner
// for more details.
++numPendingFutures_;
auto fm = agent_->sendWithRetries(
auto jitFuture = agent_->sendWithRetries(
agent_->getWorkerInfo(owner),
RRefUserDelete(rrefId, forkId).toMessage());
fm->addCallback([this](const FutureMessage& fm) {
handleException(fm);
std::weak_ptr<JitFuture> wp = jitFuture;
jitFuture->addCallback([this, wp]() {
handleException(*wp.lock());
--numPendingFutures_;
});
}
@ -493,21 +487,24 @@ void RRefContext::notifyOwnerAndParentOfFork(
// into forks_. Because, there will be no real `UserRRef` associated
// with this fork ID.
++numPendingFutures_;
auto fm = agent_->sendWithRetries(
auto jitFuture = agent_->sendWithRetries(
agent_->getWorkerInfo(parent), RRefChildAccept(forkId).toMessage());
fm->addCallback([this](const FutureMessage& fm) {
handleException(fm);
std::weak_ptr<JitFuture> wp = jitFuture;
jitFuture->addCallback([this, wp]() {
handleException(*wp.lock());
--numPendingFutures_;
});
} else {
++numPendingFutures_;
auto fm = agent_->sendWithRetries(
auto jitFuture = agent_->sendWithRetries(
agent_->getWorkerInfo(rref->owner()),
RRefForkRequest(rref->rrefId(), forkId).toMessage());
addPendingUser(forkId, rref);
fm->addCallback([this, forkId, parent](const FutureMessage& fm) {
handleException(fm);
std::weak_ptr<JitFuture> wp = jitFuture;
jitFuture->addCallback([this, forkId, parent, wp]() {
handleException(*wp.lock());
this->finishForkRequest(forkId, parent);
// Decrease after calling finishForkRequest because, as that creates a new
// future, it might otherwise cause the count to briefly go to zero.
@ -686,11 +683,12 @@ void RRefContext::clearRecordedPendingRRefsOnError() {
void RRefContext::finishForkRequest(const ForkId& forkId, worker_id_t parent) {
delPendingUser(forkId);
++numPendingFutures_;
auto fm = agent_->sendWithRetries(
auto jitFuture = agent_->sendWithRetries(
agent_->getWorkerInfo(parent), RRefChildAccept(forkId).toMessage());
fm->addCallback([this](const FutureMessage& fm) {
handleException(fm);
std::weak_ptr<JitFuture> wp = jitFuture;
jitFuture->addCallback([this, wp]() {
handleException(*wp.lock());
--numPendingFutures_;
});
}