Commit Graph

106 Commits

Author SHA1 Message Date
811ccde41a [Dynamic RPC] Add graceful shutdown for dynamic RPC members
Pull Request resolved: https://github.com/pytorch/pytorch/pull/74561

Approved by: https://github.com/mrshenli
2022-04-26 13:12:55 +00:00
285d5a55b9 Add API usage to torch.RPC (#67515)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/67515

Adding API usage to torch.rpc to better understand usage of this API.
ghstack-source-id: 141877028

Reviewed By: rohan-varma

Differential Revision: D32011465

fbshipit-source-id: 34d006ece307ae4a90fbcc6cb44fc0b7edca611e
2021-10-29 10:38:41 -07:00
da166d4f12 Add a timeout argument to RPC shutdown() (#65425)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/65425

cc pietern mrshenli pritamdamania87 zhaojuanmao satgera rohan-varma gqchen aazzolini osalpekar jiayisuse SciPioneer H-Huang gcramer23

Test Plan:
Imported from OSS

   python3 test/distributed/rpc/test_tensorpipe_agent.py -v -k test_wait_all_workers_timeout

Reviewed By: mrshenli

Differential Revision: D31092483

Pulled By: dracifer

fbshipit-source-id: 5b5e9f20b1d6602cf8cde3772678f721dddf0d78
2021-09-23 10:42:58 -07:00
1d1d5acbb0 [RPC] Ensure _wait_all_workers doesn't swallow exception. (#61094)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/61094

`_wait_all_workers` was swallowing exceptions and as a result if there
were any errors it would still continue with rpc_agent.join() which would hang
since something already failed before.

To fix this, I've ensured that wait_all_workers throws and in that case we just
proceed with an ungraceful shutdown without joining.
ghstack-source-id: 133160706

Test Plan:
1) Added unit test.
2) waitforbuildbot

Reviewed By: rohan-varma

Differential Revision: D29509286

fbshipit-source-id: 7c3f1c68d712ae2f63e10e0216580db8e9bcc29d
2021-07-07 18:28:41 -07:00
7ee68363a8 Add new rpc.barrier API (#53423)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/53423

closes #40166

This change exposes a new API, rpc.barrier() which blocks the main processes of all workers running RPC until the whole group completes this function. Optionally rpc.barrier can take in a set of worker_names and only synchronize across those worker names.

Example:
```python
import os
import torch.multiprocessing as mp
import torch.distributed.rpc as rpc
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "5678"

world_size = 4
odd_num_workers = [f"worker{i}" for i in range(world_size) if i % 2]
even_num_workers = [f"worker{i}" for i in range(world_size) if not i % 2]

def worker(i):
    print(i)
    rpc.init_rpc(f"worker{i}", rank=i, world_size=world_size)
    if i % 2:
        print(f"start barrier {i}")
        rpc.barrier(set(odd_num_workers))
    else:
        print(f"start barrier {i}")
        rpc.barrier(set(even_num_workers))
    rpc.shutdown()
    print(f"shutdown{i}")

if __name__ == '__main__':
    with mp.Pool(processes=world_size) as pool:
        pool.map(worker, range(world_size))
```

Test Plan: Imported from OSS

Reviewed By: rohan-varma

Differential Revision: D27737145

Pulled By: H-Huang

fbshipit-source-id: 369196bc62446f506d1fb6a3fa5bebcb0b09da9f
2021-06-02 14:20:16 -07:00
dc49299078 Allow passing cpu to CUDA RPC device maps (#57019)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/57019

Based on https://github.com/pytorch/pytorch/pull/56043

Test Plan: Imported from OSS

Reviewed By: anjali411

Differential Revision: D28169796

Pulled By: beauby

fbshipit-source-id: 7fcf623de07c74c4f1ab415b7e20b518876a567a
2021-05-04 04:14:27 -07:00
3a4344a717 Create helper function for RPC profiling in _invoke_rpc and remote (#56643)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/56643

Refactor enabling rpc profiling logic in `_invoke_rpc` and `remote()` into `_rpc_profiling()` helper function.

Reviewed By: rohan-varma

Differential Revision: D27922286

fbshipit-source-id: 27cfe662a401756f0ee8a3cd45978d933377f78f
2021-04-22 15:15:49 -07:00
75024e228c Add lint for unqualified type: ignore (#56290)
Summary:
The other half of https://github.com/pytorch/pytorch/issues/56272.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/56290

Test Plan:
CI should pass on the tip of this PR, and we know that the lint works because the following CI runs (before this PR was finished) failed:

- https://github.com/pytorch/pytorch/runs/2384511062
- https://github.com/pytorch/pytorch/actions/runs/765036024

Reviewed By: seemethere

Differential Revision: D27867219

Pulled By: samestep

fbshipit-source-id: e648f07b6822867e70833e23ddafe7fb7eaca235
2021-04-21 08:07:23 -07:00
1ac59d9db3 Fix RPC get_worker_info for rank=0 (#52804)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/52804

`rpc.get_worker_info` used to only take string in v1.6. We recently
allow it to accept `int` and `WorkerInfo`, but the previous check
on `worker_name` is no longer correct. This commit adds explicit
`not None` check.

Test Plan: Imported from OSS

Reviewed By: rohan-varma

Differential Revision: D26655089

Pulled By: mrshenli

fbshipit-source-id: fa1545bd6dd2b33bc1e919de46b94e799ab9719c
2021-02-25 08:15:01 -08:00
a1c67b0763 Silence harmless error logs of TensorPipe agent during shutdown (#51785)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/51785

The TensorPipe pipes do not really support a "graceful" shutdown: if one side is expecting data (i.e., it has scheduled a readDescriptor call) and the other side closes, the former will receive an error. Such an error will not even be predictable, as it depends on the backend: some may detect this and report it "well" (through an EOFError), others may not be able to tell this apart from a failure and report it as such.

This meant that during shutdown some of these errors would fire and thus the agent would log them as warning. We did add a note that these were expected under some conditions, so that users wouldn't be alarmed, but it was still a far-from-ideal experience.

In principle we could build a "protocol" on top of these pipes to "agree" on a graceful shutdown, and this was the plan to solve this. However, it was rather complicated to implement.

Here I am proposing a quicker, but perhaps hackier, solution, which re-uses the already existing graceful shutdown "protocol" of the agent (i.e., the `join` method) to put the agent in a special state in which it will silence all errors due to a remote shutting down.

Such a check cannot happen in the `shutdown` method, because that's also used in case of ungraceful shutdown (in which case I believe we'd still want to display errors). Since it needs to make sure that all participants have transitioned to this new state before any of them can continue (as otherwise one of them may close its pipes before another one has realized that this is now expected), we need to perform a barrier. Hence the ideal place for it is the `join` method, where we're already doing a lot of gang-wide synchronization. Since the `join` method isn't only called during shutdown, we need to make sure we only switch the agent to this state when it's the last call to join, and we do so by adding a new optional argument to it (which will be ignored by all agents except the TensorPipe one).

I realize this isn't the prettiest solution, and since it changes the agent's API it's worth discussing it carefully. Let me know what you think!
ghstack-source-id: 121131940

Test Plan: Run on CircleCI, where this occurred quite a bit, and check the logs.

Reviewed By: mrshenli

Differential Revision: D26276137

fbshipit-source-id: 69ef14fe10908e80e627d9b4505352e482089cc8
2021-02-10 10:58:22 -08:00
c3f2f3294e [RPC] Add option to make rref.get_type not block. (#50977)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/50977

Adds a `blocking` flag that can be set to False to make this API return a `Future` to the type. This is to make this function non-blocking, mostly for a future change that will allow `rref.rpc_async()` to be completely non-blocking (it currently calls and waits for this function that issues an RPC in-line).
ghstack-source-id: 121021433

Test Plan: Modified UT

Reviewed By: mrshenli

Differential Revision: D25944582

fbshipit-source-id: e3b48a52af2d4578551a30ba6838927b489b1c03
2021-02-04 20:18:50 -08:00
ab1ba8f433 [RPC] Support timeout in rref._get_type() (#50498)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/50498

This change is mostly needed for the next diff in this stack, where
rref._get_type() is called in the rpc_async/rpc_sync RRef proxy function and
can block indefinitely if there is no timeout. It will also be useful to have a
timeout argument when we publicize this API to keep it consistent with other
RPC APIs.
ghstack-source-id: 119859767

Test Plan: Added UT

Reviewed By: pritamdamania87

Differential Revision: D25897588

fbshipit-source-id: 2e84aaf7e4faecf80005c78ee2ac8710f387503e
2021-01-15 13:18:39 -08:00
2c4b6ec457 Unused exception variables (#50181)
Summary:
These unused variables were identified by [pyflakes](https://pypi.org/project/pyflakes/). They can be safely removed to simplify the code.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/50181

Reviewed By: gchanan

Differential Revision: D25844270

fbshipit-source-id: 0e648ffe8c6db6daf56788a13ba89806923cbb76
2021-01-08 13:33:18 -08:00
e6779d4357 [*.py] Rename "Arguments:" to "Args:" (#49736)
Summary:
I've written custom parsers and emitters for everything from docstrings to classes and functions. However, I recently came across an issue when I was parsing/generating from the TensorFlow codebase: inconsistent use of `Args:` and `Arguments:` in its docstrings.

```sh
(pytorch#c348fae)$ for name in 'Args:' 'Arguments:'; do
    printf '%-10s %04d\n' "$name" "$(rg -IFtpy --count-matches "$name" | paste -s -d+ -- | bc)"; done
Args:      1095
Arguments: 0336
```

It is easy enough to extend my parsers to support both variants, however it looks like `Arguments:` is wrong anyway, as per:

  - https://google.github.io/styleguide/pyguide.html#doc-function-args @ [`ddccc0f`](https://github.com/google/styleguide/blob/ddccc0f/pyguide.md)

  - https://chromium.googlesource.com/chromiumos/docs/+/master/styleguide/python.md#describing-arguments-in-docstrings @ [`9fc0fc0`](https://chromium.googlesource.com/chromiumos/docs/+/9fc0fc0/styleguide/python.md)

  - https://sphinxcontrib-napoleon.readthedocs.io/en/latest/example_google.html @ [`c0ae8e3`](https://github.com/sphinx-contrib/napoleon/blob/c0ae8e3/docs/source/example_google.rst)

Therefore, only `Args:` is valid. This PR replaces them throughout the codebase.

PS: For related PRs, see tensorflow/tensorflow/pull/45420

PPS: The trackbacks automatically appearing below are sending the same changes to other repositories in the [PyTorch](https://github.com/pytorch) organisation.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/49736

Reviewed By: albanD

Differential Revision: D25710534

Pulled By: soumith

fbshipit-source-id: 61e8ff01abb433e9f78185c2d1d0cbd7c22c1619
2020-12-28 09:34:47 -08:00
eaa993a2e0 Add type annotations to torch._C._distributed_rpc module. (#46624)
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/46624

Test Plan: Imported from OSS

Reviewed By: glaringlee

Differential Revision: D24761656

Pulled By: xuzhao9

fbshipit-source-id: b55aee5dd2b97f573a50e5bbfddde7d984943fec
2020-11-06 01:28:51 -08:00
58ed60c259 Added context manager enabling all futures returned by rpc_async and custom build rpc functions to be automatically waited on (#41807)
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/41807

Test Plan: Make sure ci tests pass, including newly written test

Reviewed By: mrshenli

Differential Revision: D22640839

Pulled By: osandoval-fb

fbshipit-source-id: 3ff98d8e8c6e6d08575e307f05b5e159442d7216
2020-10-26 12:53:35 -07:00
f89498f3f8 Allow RPC framework to use rank in addition to WorkerInfo and name. (#46221)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/46221

The RPC framework only allowed sending RPCs based on provided
WorkerInfo or name. When using RPC with DDP, sometimes it might just be easier
to refer to everything in terms of ranks since DDP doesn't support names yet.

As a result, support a `to` parameter in the RPC APIs which allow for
specifying a rank as well would be helpful.
ghstack-source-id: 114207172

Test Plan:
1) waitforbuildbot
2) Unit Tests

Reviewed By: mrshenli

Differential Revision: D24264989

fbshipit-source-id: 5edf5d92e2bd2f213471dfe7c74eebfa9efc9f70
2020-10-13 17:52:54 -07:00
94c3cdd994 Let rpc._all_gather use default RPC timeout (#44983)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44983

`_all_gather` was converted from `_wait_all_workers` and inherited its
5 seconds fixed timeout. As `_all_gather` meant to support a broader
set of use cases, the timeout configuration should be more flexible.
This PR makes `rpc._all_gather` use the global default RPC timeout.

Test Plan: Imported from OSS

Reviewed By: pritamdamania87

Differential Revision: D23794383

Pulled By: mrshenli

fbshipit-source-id: 382f52c375f0f25c032c5abfc910f72baf4c5ad9
2020-09-23 08:06:09 -07:00
09e7f62ce2 Fix RPC and ProcessGroup GIL deadlock (#45088)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/45088

Fixes #45082

Found a few problems while working on #44983

1. We deliberately swallow RPC timeouts during shutdown, as we haven't
found a good way to handle those. When we convert `_wait_all_workers`
into `_all_gather`, the same logic was inherited. However, as
`_all_gather` meant to be used in more general scenarios, we should
no longer keep silent about errors. This commit let the error throw
in `_all_gather` and also let `shutdown()` to catch them and log.
2. After fixing (1), I found that `UnpickledPythonCall` needs to
acquire GIL on destruction, and this can lead to deadlock when used
in conjuction with `ProcessGroup`. Because `ProcessGroup` ctor is a
synchronization point which holds GIL. In `init_rpc`, followers
(`rank != 0`) can exit before the leader (`rank == 0`). If the two
happens together, we could get a) on a follower, it exits `init_rpc`
after running `_broadcast_to_followers` and before the reaching dtor
of `UnpickledPythonCall`. Then it runs the ctor of `ProcessGroup`,
which holds the GIL and wait for the leader to join. However, the
leader is waiting for the response from `_broadcast_to_followers`,
which is blocked by the dtor of `UnpickledPythonCall`. And hence
the deadlock. This commit drops the GIL in `ProcessGroup` ctor.
3. After fixing (2), I found that `TensorPipe` backend
nondeterministically fails with `test_local_shutdown`, due to a
similar reason as (2), but this time it is that `shutdown()` on a
follower runs before the leader finishes `init_rpc`. This commit
adds a join for `TensorPipe` backend `init_rpc` after `_all_gather`.

The 3rd one should be able to solve the 2nd one as well. But since
I didn't see a reason to hold GIL during `ProcessGroup` ctor, I
made that change too.

Test Plan: Imported from OSS

Reviewed By: pritamdamania87

Differential Revision: D23825592

Pulled By: mrshenli

fbshipit-source-id: 94920f2ad357746a6b8e4ffaa380dd56a7310976
2020-09-21 21:47:27 -07:00
924717bf51 Add _get_type() API to RRef (#44663)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44663

The new API returns the type of the data object referenced by this
`RRef`. On the owner, this is same as `type(rref.local_value())`.
On a user, this will trigger an RPC to fetch the `type` object from
the owner. After this function is run once, the `type` object is
cached by the `RRef`, and subsequent invocations no longer trigger
RPC.

closes #33210

Test Plan: Imported from OSS

Reviewed By: rohan-varma

Differential Revision: D23691990

Pulled By: mrshenli

fbshipit-source-id: a2d87cd601a691dd75164b6bcd7315245e9cf6bd
2020-09-16 11:59:22 -07:00
06aaf8c20d Add set_device_map to TensorPipeOptions to support GPU args (#42637)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/42637

This commit enables sending non-CPU tensors through RPC using
TensorPipe backend. Users can configure device mappings by calling
set_map_location on `TensorPipeRpcBackendOptions`. Internally,
the `init_rpc` API verifies the correctness of device mappings. It
will shutdown RPC if the check failed, or proceed and pass global
mappings to `TensorPipeAgent` if the check was successful. For serde,
we added a device indices field to TensorPipe read and write buffers,
which should be either empty (all tensors must be on CPU) or match
the tensors in order and number in the RPC message. This commit
does not yet avoid zero-copy, the tensor is always moved to CPU
on the sender and then moved to the specified device on the receiver.

Test Plan: Imported from OSS

Reviewed By: izdeby

Differential Revision: D23011572

Pulled By: mrshenli

fbshipit-source-id: 62b617eed91237d4e9926bc8551db78b822a1187
2020-08-14 18:46:55 -07:00
326d777e53 Convert _wait_all_workers to _all_gather (#42276)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/42276

This commit converts `_wait_all_workers()` to `_all_gather()` by
allowing each worker to provide its own data object. The `_all_gather()`
function blocks and returns the gathered results. This API can be
converted to `rpc.barrier()` latter.

Test Plan: Imported from OSS

Reviewed By: lw

Differential Revision: D22853480

Pulled By: mrshenli

fbshipit-source-id: 9d506813b9fd5b7c144885e2b76a863cbd19466a
2020-08-03 08:48:45 -07:00
ca1b8ebbcb move misc implementation out of jit/__init__.py (#41154)
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/41154

Test Plan: Imported from OSS

Reviewed By: ailzhang

Differential Revision: D22445213

Pulled By: suo

fbshipit-source-id: 200545715c5ef13beb1437f49e01efb21498ddb7
2020-07-13 16:59:55 -07:00
c93e96fbd9 [jit] move script-related implementation out of torch/jit/__init__.py (#40902)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/40902

See the bottom of this stack for context.

Test Plan: Imported from OSS

Reviewed By: eellison

Differential Revision: D22360210

Pulled By: suo

fbshipit-source-id: 4275127173a36982ce9ad357aa344435b98e1faf
2020-07-08 11:38:34 -07:00
7c07c39845 [torch.distributed.rpc] Install method docstrings from PyRRef to RRef (#40461)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/40461

It turned out `:inheried-members:` (see [doc](https://www.sphinx-doc.org/en/master/usage/extensions/autodoc.html#directive-autoclass)) is not really usable.

Because pybind11 generates a docstring that writes `self` as parent class, `rpc.PyRRef`, type.

As a workaround, I am pulling docstrings on parent-class, `PyRRef` class, into subclass, `RRef`. And do surgery on the docstring generated by pybind11.

{F241283111}

ghstack-source-id: 106472496

Test Plan:
buck test mode/dev-nosan //caffe2/test/distributed/rpc/:rpc_fork

buck build mode/dev-nosan //caffe2/test/distributed/rpc/:rpc_fork && \
buck-out/gen/caffe2/test/distributed/rpc/rpc_fork\#binary.par \
-r test_rref_str

buck build mode/dev-nosan //caffe2/test/distributed/rpc/:rpc_fork && \
buck-out/gen/caffe2/test/distributed/rpc/rpc_fork\#binary.par \
-r test_return_local_rrefs

buck test mode/dev-nosan //caffe2/torch/fb/distributed/model_parallel/tests:test_elastic_averaging -- 'test_elastic_averaging_center \(caffe2\.torch\.fb\.distributed\.model_parallel\.tests\.test_elastic_averaging\.TestElasticAveragingCenter\)'

P134031188

Differential Revision: D7933834

fbshipit-source-id: c03a8a4c9d98888b64492a8caba1591595bfe247
2020-06-23 19:58:36 -07:00
14f7e95c1a Add prefix of remote events for RPC profiling (#40066)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/40066

Builds on top of the previous PR to ensure that all remotely profiled events are prefixed with the key for the RPC that generated them.

The key is generated by the result of `_build_rpc_profiling_key` in `rpc/internal.py` and prefixed onto the event name. In order to do this, we set the current-key when creating the RPC in Python, retrieve the currently-set key in C++ and save a GloballyUniqueId -> key mapping to an in-memory map. When we receive an RPC with profiling information, we expect to receive this ID back, and look up the corresponding profiling key in the map.

The key is then added to all the remote events.

Tested by adding tests to ensure the key is added to all the remote events. Also added a UT which tests in under the multi-threading scenario, to ensure that the mapping's correctness is maintained when several RPCs are in the process of being created at once.
ghstack-source-id: 106316106

Test Plan: Unit test

Differential Revision: D22040035

fbshipit-source-id: 9215feb06084b294edbfa6e03385e13c1d730c43
2020-06-22 11:01:07 -07:00
5d0044389a Minor RPC doc improvements (#40305)
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/40305

Test Plan: Imported from OSS

Differential Revision: D22144304

Pulled By: mrshenli

fbshipit-source-id: 1c8a9648043eabaf909c6e4ae116672396a9f0f5
2020-06-19 15:34:58 -07:00
caf0c286b8 Fix RPC API doc links (#40299)
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/40299

Test Plan: Imported from OSS

Differential Revision: D22143156

Pulled By: mrshenli

fbshipit-source-id: c11848ebfe8863d59509a0fbc042eed71a58e514
2020-06-19 15:34:53 -07:00
034eddca01 Fix typos in RPC Docs (#40219)
Summary:
Environment variable MASTER_ADDRESS and MASTER_port should be MASTER_ADDR and MASTER_PORT respectively.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/40219

Differential Revision: D22116585

Pulled By: mrshenli

fbshipit-source-id: d312ae66210b0a16ec3ab1f468b1654bb0a75a0f
2020-06-18 11:40:11 -07:00
f3f30d4354 [JIT x RPC] Consolidate RRef type class and RRef impl class (#35694)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/35694

close https://github.com/pytorch/pytorch/issues/35110

Differential Revision: D7881729

fbshipit-source-id: eedda8f1b7510491886d469efeed4e002bb8b991
2020-06-18 07:46:38 -07:00
3fb1e73a4e Add rpc.async_execution support for rpc.remote on script functions (#39758)
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/39758

Test Plan: Imported from OSS

Differential Revision: D21963789

Pulled By: mrshenli

fbshipit-source-id: f16f464ba01401b160cc4d3daf036e4bc806d7ea
2020-06-10 13:17:07 -07:00
9bfb91b50b Fix possible deadlock in _wait_all_workers (#39535)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/39535

This is my understanding of what could happen: on workerN (N != 0), `_wait_all_workers_sequence_id_to_states`, which is a `defaultdict`, is accessed twice: once in the body of `_wait_all_workers` (by the "main thread" of workerN) and once in `_set_proceed_shutdown_signal`, called by worker0 through a RPC call. I think the two could race and access the `_wait_all_workers_sequence_id_to_states` at the same time, and thus create two separate copies of `WaitAllWorkersStates`. One of those threads would wait  on the event of one copy, but the other thread would set the event of the other copy. This lead to a deadlock, as the main thread would end up waiting forever.
ghstack-source-id: 105283327

Test Plan: I added additional logging in those functions, ran a stress test of the RPC test suite, based on the logs I suspected that this could be the issue, fixed it and re-run the stress test and didn't see the bug anymore. This is admittedly not very convincing evidence, as I may just have been lucky that second time...

Differential Revision: D21889752

fbshipit-source-id: 05ec710bd2930313e1480ae896b4b2f5f503aa17
2020-06-05 02:42:32 -07:00
8a6914ddb2 Add @rpc.functions.async_execution for rpc.remote (#39486)
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/39486

Test Plan: Imported from OSS

Differential Revision: D21871422

Pulled By: mrshenli

fbshipit-source-id: 3c432b7718a47732b2aee064c554f6bdcc5c95c1
2020-06-04 22:38:35 -07:00
8b2bb02e09 Implement timeout support for RRefs (#38590)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/38590

This PR implements timeout semantics for RRef for parity with rpc_sync and rpc_async. How it works:

- Timeout parameter is added to rpc.remote. If the rpc.remote call times out, note that the error won't be raised to the user in that call, as it is not blocking (similar to rpc_async). Instead, the timeout error will be raised the next time the RRef is used (either by pickling or to_here call).
- Error handling semantics are added to RRef to deal with the timeout errors. Previously, if there was an error creating the OwnerRRef, the callback on the local user would throw an error in a callback, resulting in an `std::terminate`. Instead of this, the error is now caught and surfaced to the user the next time the RRef is used. As part of this, we have added an `RPCErrorType` enum and defined RRef error handlers to handle the `RPCErrorrTypes` (currently just timeout and unknown)
- A timeout parameter is added to `to_here()` which gives the user control over the max amount of time it can block for.
- `ctx.prepareChildForFork()` which is called when the RRef is pickled (i.e. used as an arg over RPC) checks if the `rpc.remote()` call had timed out, and if so, raises that error to the user.
- Tests are added, primarily via delay injection.
ghstack-source-id: 105232837

Test Plan: CI

Differential Revision: D21588165

fbshipit-source-id: c9f9e8aa3521012ea1de3e0f152a41afdf8b23f3
2020-06-04 02:14:42 -07:00
67cea74dd3 Add rpc.async_function decorator for TorchScript functions (#39267)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/39267

When combined with `torch.jit.script`, the order of decorators matter.
`rpc.functions.async_execution` must be the outmost one. The
`async_execution` decorator will store the TorchScript function in
attribute `_wrapped_async_rpc_function` on the wrapper function, and
pass this wrapped TorchScript function (i.e., an instance of
`torch.jit.ScriptFunction`) to RPC. The caller will mark the ScriptCall
with `isAsyncExecution=true`, and the callee will extract the returned
`Future` in C++ and install subsequent processing as a callback to
that `Future`.

Test Plan: Imported from OSS

Differential Revision: D21792688

fbshipit-source-id: de095eb148d21e9114a478e9e6047c707d34fd07
2020-06-03 22:27:15 -07:00
a05ef17e46 Add rpc.functions.async_execution decorator for rpc_sync/rpc_async (#39216)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/39216

The `rpc.functions.async_execution` decorator specifies that the
wrapped function is guaranteed to return a `torch.futures.Future`.
The decorator adds a `_wrapped_async_rpc_function` attribute to
the wrapper function. The caller retrieves this information and
then sets `isAsyncFunction` argument accordingly which is later
added to PythonCall RPC message as a field. On the callee side,
if the PythonCall carries an asynchronous function, it will cast
the function's return value to a jit::PythonFutureWrapper object,
and then install response creation and communication as a callback
on the that jit::PythonFutureWrapper.

For applications, this feature is useful when a function needs to
wait for IO or additional singaling. In those cases, marking the
user function as `rpc.functions.async_execution` will prevent it
from blocking one thread on callee for too long.

Test Plan: Imported from OSS

Reviewed By: rohan-varma

Differential Revision: D21779962

fbshipit-source-id: 6b6aa698bf6f91dad6ed2a7ee433df429b59e941
2020-06-02 23:21:25 -07:00
a6f0051db2 Fix test_get_and_set_timeout for TensorPipe Agent (#39353)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/39353

This test failed with TSAN since the shortened timeout prevented all
messages from being processed within the timeout during Phase 1 of
wait_all_workers during RPC shutdown. Phase 2 already had a longer timeout, so
we extend this to Phase 1 as well.
ghstack-source-id: 105045926

Test Plan: Ran the test_get_and_set_timeout with TSAN

Differential Revision: D21826783

fbshipit-source-id: 7edfdeb50169b31e997dd36a3fd8eea0e9ae7189
2020-06-02 12:01:11 -07:00
6736a76cec Back out "[RPC] [Minor] RPC entry point cleanup"
Summary:
Original commit changeset: b509c47fb612

(Note: this ignores all push blocking failures!)

Reviewed By: xush6528

Differential Revision: D21669711

fbshipit-source-id: e452a513a2d22eaa3bffa333fdb3277fabc24b41
2020-05-20 15:35:24 -07:00
befc76bb65 [RPC] [Minor] RPC entry point cleanup (#34292)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/34292

This is to finish a cleanup request from https://github.com/pytorch/pytorch/pull/34733#discussion_r392479110.

ghstack-source-id: 104361618

Test Plan:
```
buck test mode/dev-nosan //caffe2/test/distributed/rpc/jit:rpc_fork

buck build mode/dev-nosan //caffe2/test/distributed/rpc/jit:rpc_fork && \
buck-out/gen/caffe2/test/distributed/rpc/jit/rpc_fork\#binary.par \
-r test_return_local_script_class_rref_in_py_and_use_in_script

buck build mode/dev-nosan //caffe2/test/distributed/rpc/jit:rpc_fork && \
buck-out/gen/caffe2/test/distributed/rpc/jit/rpc_fork\#binary.par \
-r test_return_local_script_module_rref_in_py_and_use_in_script
```

Differential Revision: D7436759

fbshipit-source-id: b509c47fb612ec3486ff1199c005eba69480ee05
2020-05-19 14:23:11 -07:00
4d4895a62a Use Future's then() API to fix RPC profiling (#38352)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/38352

Fixes the RPC profiling by using the `then()` API added in https://github.com/pytorch/pytorch/pull/37311. Instead of adding a regular callback, we return a new future that completes when the profiling callback is finished. This is transparent to the user as the future still completes with the value of the original future (i.e. the RPC's return value)

To make this work for RRef, we add a `_set_profiling_future` to set the profiling future, and `_get_profiling_future` to retrieve this future and wait on it in the tests.

Re-enabled profiling tests and stress tested them 1000 times to verify the fix
ghstack-source-id: 104086114

Test Plan: Re-enabled profiling tests

Differential Revision: D21506940

fbshipit-source-id: 35cde22f0551c825c9bc98ddc24cca412878a63a
2020-05-14 12:52:45 -07:00
3d0279862d Consolidate builtin/python_udf RPC to return ivalue::Future like torchscript RPC does (#35154)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/35154

This is for issue https://github.com/pytorch/pytorch/issues/34999.

close https://github.com/pytorch/pytorch/issues/34999.

https://github.com/pytorch/pytorch/issues/34997 need more work.

This will make a few work items easier, like 1) Dist autograd profiler, 2) JIT annotation for Future.

Test Plan:
```
buck test mode/dev-nosan //caffe2/test/distributed/rpc:rpc_fork

buck test mode/dev-nosan //caffe2/test/distributed/rpc:rpc_fork -- test_rref_forward_chain --stress-runs 100

buck build mode/dev-nosan //caffe2/test/distributed/rpc:rpc_fork && \
buck-out/gen/caffe2/test/distributed/rpc/rpc_fork\#binary.par \
-r test_call_method_on_rref
```

buck test mode/dev-nosan //caffe2/test/distributed/rpc:rpc_fork -- 'test_rref_proxy_class \(fb\.test_rpc_fork\.RpcTestWithFork\)' --stress-runs 100

test_rref_proxy_reuse
test_handle_send_exceptions

```
buck test mode/dev-nosan //caffe2/test/distributed/rpc/jit:rpc_fork

buck build mode/dev-nosan //caffe2/test/distributed/rpc/jit:rpc_fork && \
buck-out/gen/caffe2/test/distributed/rpc/jit/rpc_fork\#binary.par \
-r test_script_call_python_return_future
```

Differential Revision: D7722184

fbshipit-source-id: bd92b855bfea4913d6672700590c57622fa86e0e
2020-05-08 21:28:56 -07:00
7bd2014eec [resubmit][rpc] per-RPC timeouts for rpc_sync and rpc_async (#34650)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/34650

Resubmit of https://github.com/pytorch/pytorch/pull/33840, which was overly eager in the sense that it deleted a lot of code that we didn't want to get rid of yet (default timeout handling).

This PR adds an optional argument into `rpc_sync` and `rpc_async` as well as `RpcAgent::send()` that allows the user to specify a timeout for an RPC to override the default set timeout. If the user does not specify this argument, then the currently set default RPC timeout given in the RPC constructor or by `rpc.set_rpc_timeout()` is used. Otherwise, we use the passed in timeout.

This diff does not address:
1) timeout support when called rpc.rpc_async is called as a JIT operator. For this to work, we would need to change the logic in `register_distributed_ops` to pass in this timeout to `rpcTorchscript`. One more issue is that torchscript doesn't support the timedelta object. This will be done in a follow up PR as it requires a fair amount of changes to the argument parsing logic.
2) Per-RPC timeouts for internal messages or `rpc.remote()`. A follow-up diff will address the latter with the approach of raising the timeout error at the earliest next possible time to the user, such as when the next time the RRef is forked or `to_here` is called

Added unit tests to confirm the current behavior
ghstack-source-id: 102622601

Test Plan: Added unit tests in rpc_test

Differential Revision: D20376953

fbshipit-source-id: 9fb3f147520588308ab50dd33286255658d76d47
2020-04-22 13:00:42 -07:00
e75fb4356b Remove (most) Python 2 support from Python code (#35615)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/35615

Python 2 has reached end-of-life and is no longer supported by PyTorch.
Now we can clean up a lot of cruft that we put in place to support it.
These changes were all done manually, and I skipped anything that seemed
like it would take more than a few seconds, so I think it makes sense to
review it manually as well (though using side-by-side view and ignoring
whitespace change might be helpful).

Test Plan: CI

Differential Revision: D20842886

Pulled By: dreiss

fbshipit-source-id: 8cad4e87c45895e7ce3938a88e61157a79504aed
2020-04-22 09:23:14 -07:00
752d3c281a [profiler] Allow record_function ctx manager to profile futures (#35055)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/35055

This is the first step to improving the way RPCs are profiled as suggested by Ilia. For now, since RPC can return two different types of futures, we have to implement two different code paths, one for the python eager mode future and one for the jit future.

This diff implements the python eager part. We have defined a method `_call_end_callbacks_on_future` that takes in a future and schedules a `RecordFunction` to be completed as a callback on the future.

Once https://github.com/pytorch/pytorch/pull/35039 lands, we can implement the JIT codepath by registering an operator that takes a `Future(t)` as well.

These code paths will be merged once the futures are merged.
ghstack-source-id: 102478180

Test Plan: Added unit tests

Differential Revision: D20452003

fbshipit-source-id: 1acdcb073bd1f63d6fb2e78277ac0be00fd6671d
2020-04-20 12:37:54 -07:00
f59e646faa [rpc] Allow profiling in RPC to work with torchscript function invocations (#36275)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/36275

Calling a TorchScript function from within RPC was added after initial
support for the profiler with RPC, hence, we were not recording torchscript
funtions invoked under RPC correctly. This diff passes the `RecordFunction` to
the `_invoke_torchscript..` calls similar to what is done for builtin and UDFs.

However, this is only a temporary solution. We will be removing the use of
`RecordFunction` as a standalone in the RPC code in
https://github.com/pytorch/pytorch/pull/35055. This diff is to unblock
recording of torchscript functions in the meantime.
ghstack-source-id: 101800134

Test Plan:
Added tests for calling a script function with builtin, sync, and
asyc. The output looks like below:

```
------  ---------------  ---------------  ---------------  ---------------  ---------------
> Name                                                                                                        Self CPU
total %  Self CPU total   CPU total %      CPU total        CPU time avg     Number of Calls
> ----------------------------------------------------------------------------------------------------------  ---------
------  ---------------  ---------------  ---------------  ---------------  ---------------
> rpc_sync#__torch__.torch.testing._internal.distributed.rpc.rpc_test.my_script_func(worker1 -> worker2)      99.92%
        1.056s           99.92%           1.056s           1.056s           1
> select                                                                                                      0.04%
        383.661us        0.04%            383.661us        95.915us         4
> fill_                                                                                                       0.02%
        210.966us        0.02%            210.966us        52.741us         4
> to                                                                                                          0.00%
        26.276us         0.00%            26.276us         26.276us         1
> empty                                                                                                       0.02%
        159.802us        0.02%            159.802us        79.901us         2
> set_                                                                                                        0.01%
        93.818us         0.01%            93.818us         93.818us         1
> ----------------------------------------------------------------------------------------------------------  ---------
------  ---------------  ---------------  ---------------  ---------------  ---------------
> Self CPU time total: 1.057s
```

Note that we use `torch.jit._qualified_name` to get the name of the script fn.

Differential Revision: D20930453

fbshipit-source-id: c6d940aa44fcd9dd8a1a29c156aa19e0d8428d60
2020-04-08 23:58:36 -07:00
82dd01150c Fix race during RPC shutdown. (#36113)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/36113

As part of debugging https://github.com/pytorch/pytorch/issues/35863,
I discovered that the unit test would timeout during clean shutdown.

Looking into this further, it looks like there is a race in
`_on_leader_follower_report_shutdown_intent` when multiple followers call the
same method on the leader.

To fix this, I've ensured we have an appropriate lock in
`_on_leader_follower_report_shutdown_intent` to guard against this.

I ran the test 500 times to validate that this fix works.

Closes #35863
ghstack-source-id: 101641463

Test Plan:
1) waitforbuildbot
2) Ran the test 500 times.

Differential Revision: D20884373

fbshipit-source-id: 9d580e9892adffc0c9a4c2e832881fb291a1ff16
2020-04-08 14:12:33 -07:00
ac639d927a Reland "[RPC] Use qualified name str directly in RPC torch script code path" (#35489)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/35489

Relanding https://github.com/pytorch/pytorch/pull/34733.

Fix is in https://github.com/pytorch/pytorch/pull/34988

Test Plan:
```
buck test mode/dev-nosan //caffe2/test/distributed/rpc:rpc_fork
```

```
buck test mode/dev-nosan //caffe2/test/distributed/rpc/jit:rpc_fork

buck build mode/dev-nosan //caffe2/test/distributed/rpc/jit:rpc_fork && \
buck-out/gen/caffe2/test/distributed/rpc/jit/rpc_fork\#binary.par \
-r test_return_local_script_class_rref_in_py_and_use_in_script

buck build mode/dev-nosan //caffe2/test/distributed/rpc/jit:rpc_fork && \
buck-out/gen/caffe2/test/distributed/rpc/jit/rpc_fork\#binary.par \
-r test_return_local_script_module_rref_in_py_and_use_in_script
```

Differential Revision: D20661748

fbshipit-source-id: d550daab8d689d0a9aa2450f3bdb7417ab79dae2
2020-03-26 23:41:51 -07:00
5d92a6cc30 Revert D7778113: Reland "[RPC] Use qualified name str directly in RPC torch script code path"
Test Plan: revert-hammer

Differential Revision:
D7778113

Original commit changeset: b830c03ac946

fbshipit-source-id: ef08b287a6db58320c738cde0c99b3333f5724eb
2020-03-19 06:05:23 -07:00
d616cad676 Reland "[RPC] Use qualified name str directly in RPC torch script code path" (#34962)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/34962

Relanding #34733. Fix is in https://github.com/pytorch/pytorch/pull/34988.

Test Plan:
```
buck test mode/dev-nosan //caffe2/test/distributed/rpc:rpc_fork
```

```
buck test mode/dev-nosan //caffe2/test/distributed/rpc/jit:rpc_fork

buck build mode/dev-nosan //caffe2/test/distributed/rpc/jit:rpc_fork \
&& buck-out/gen/caffe2/test/distributed/rpc/jit/rpc_fork\#binary.par \
-r test_return_local_script_class_rref_in_py_and_use_in_script

buck build mode/dev-nosan //caffe2/test/distributed/rpc/jit:rpc_fork \
&& buck-out/gen/caffe2/test/distributed/rpc/jit/rpc_fork\#binary.par \
-r test_return_local_script_module_rref_in_py_and_use_in_script
```

```
buck test mode/dev //caffe2/test/distributed/rpc/jit:rpc_fork_thrift -- test_return_local_script_module_rref_in_py_and_use_in_script
```

Differential Revision: D7778113

fbshipit-source-id: b830c03ac9463075fca248eba75be364b0e8b080
2020-03-18 22:25:09 -07:00
b35e544772 Minor fixes for RPC API doc (#34955)
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/34955

Test Plan: Imported from OSS

Differential Revision: D20512262

Pulled By: mrshenli

fbshipit-source-id: 86ed099638fd32dc8fbde5a6f284239b146fd5e9
2020-03-18 11:20:32 -07:00