Sometimes you want to query the small element of a set of elements and use `sorted(elements)[0]` without a second thought. However, this is not optimal, since the entire list must be sorted first `O(n log n)`. It would be better to use the `min(elements)` method provided for this purpose `O(n)`.
Furthermore `sorted(elements)[::-1]` is not very efficient, because it would be better to use `sorted(elements, reverse=True)` to save the slice operation.
**TLDR: using `sorted(elements)[0]` is slow and can be replaced with `min(elements)`.**
I stumbled across these code snippets while playing around with CodeQL (see https://lgtm.com/query/4148064474379348546/).
Pull Request resolved: https://github.com/pytorch/pytorch/pull/86995
Approved by: https://github.com/jansel
Fix use-dict-literal pylint suggestions by changing `dict()` to `{}`. This PR should do the change for every Python file except test/jit/test_list_dict.py, where I think the intent is to test the constructor.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/83718
Approved by: https://github.com/albanD
This is a new version of #15648 based on the latest master branch.
Unlike the previous PR where I fixed a lot of the doctests in addition to integrating xdoctest, I'm going to reduce the scope here. I'm simply going to integrate xdoctest, and then I'm going to mark all of the failing tests as "SKIP". This will let xdoctest run on the dashboards, provide some value, and still let the dashboards pass. I'll leave fixing the doctests themselves to another PR.
In my initial commit, I do the bare minimum to get something running with failing dashboards. The few tests that I marked as skip are causing segfaults. Running xdoctest results in 293 failed, 201 passed tests. The next commits will be to disable those tests. (unfortunately I don't have a tool that will insert the `#xdoctest: +SKIP` directive over every failing test, so I'm going to do this mostly manually.)
Fixes https://github.com/pytorch/pytorch/issues/71105
@ezyang
Pull Request resolved: https://github.com/pytorch/pytorch/pull/82797
Approved by: https://github.com/ezyang
### Description
Across PyTorch's docstrings, both `callable` and `Callable` for variable types. The Callable should be capitalized as we are referring to the `Callable` type, and not the Python `callable()` function.
### Testing
There shouldn't be any testing required.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/82487
Approved by: https://github.com/albanD
Summary:
This PR was created to resolve issue brought up in https://fb.workplace.com/groups/319878845696681/permalink/741428653541696/
Changes:
- Adds timeout argument to RpcAgent.join()
- Add optional timeout argument to ThriftRpcAgent barrier()
- During shutdown (ThriftRpcAgent join) calls the barrier, the agent will use the timeout passed to shutdown and pass that timeout into the join().
- Update API.py to also include fix bug (missing timeout for signal)
- Change default shutdown timeout to 0 (no timeout). Existing functionality in _all_gather will remain the same and wait indefinitely for signal if no timeout is set for the function. New functionality has user specify timeout for both the signal and rpc calls.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/76194
Test Plan:
Modified barrier test
buck test torch/fb/distributed/thriftRpcBackend/test:ThriftRpcAgentTest -- BarrierTest
Reviewed By: mrshenli
Differential Revision: D35825382
fbshipit-source-id: e91e9ab5d9fca08787cb6b6b8125a4b03d1c7cde
(cherry picked from commit fcf899a387001574bf4e39a213ea741611d76097)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/61094
`_wait_all_workers` was swallowing exceptions and as a result if there
were any errors it would still continue with rpc_agent.join() which would hang
since something already failed before.
To fix this, I've ensured that wait_all_workers throws and in that case we just
proceed with an ungraceful shutdown without joining.
ghstack-source-id: 133160706
Test Plan:
1) Added unit test.
2) waitforbuildbot
Reviewed By: rohan-varma
Differential Revision: D29509286
fbshipit-source-id: 7c3f1c68d712ae2f63e10e0216580db8e9bcc29d
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/53423closes#40166
This change exposes a new API, rpc.barrier() which blocks the main processes of all workers running RPC until the whole group completes this function. Optionally rpc.barrier can take in a set of worker_names and only synchronize across those worker names.
Example:
```python
import os
import torch.multiprocessing as mp
import torch.distributed.rpc as rpc
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "5678"
world_size = 4
odd_num_workers = [f"worker{i}" for i in range(world_size) if i % 2]
even_num_workers = [f"worker{i}" for i in range(world_size) if not i % 2]
def worker(i):
print(i)
rpc.init_rpc(f"worker{i}", rank=i, world_size=world_size)
if i % 2:
print(f"start barrier {i}")
rpc.barrier(set(odd_num_workers))
else:
print(f"start barrier {i}")
rpc.barrier(set(even_num_workers))
rpc.shutdown()
print(f"shutdown{i}")
if __name__ == '__main__':
with mp.Pool(processes=world_size) as pool:
pool.map(worker, range(world_size))
```
Test Plan: Imported from OSS
Reviewed By: rohan-varma
Differential Revision: D27737145
Pulled By: H-Huang
fbshipit-source-id: 369196bc62446f506d1fb6a3fa5bebcb0b09da9f
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/52804
`rpc.get_worker_info` used to only take string in v1.6. We recently
allow it to accept `int` and `WorkerInfo`, but the previous check
on `worker_name` is no longer correct. This commit adds explicit
`not None` check.
Test Plan: Imported from OSS
Reviewed By: rohan-varma
Differential Revision: D26655089
Pulled By: mrshenli
fbshipit-source-id: fa1545bd6dd2b33bc1e919de46b94e799ab9719c
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/51785
The TensorPipe pipes do not really support a "graceful" shutdown: if one side is expecting data (i.e., it has scheduled a readDescriptor call) and the other side closes, the former will receive an error. Such an error will not even be predictable, as it depends on the backend: some may detect this and report it "well" (through an EOFError), others may not be able to tell this apart from a failure and report it as such.
This meant that during shutdown some of these errors would fire and thus the agent would log them as warning. We did add a note that these were expected under some conditions, so that users wouldn't be alarmed, but it was still a far-from-ideal experience.
In principle we could build a "protocol" on top of these pipes to "agree" on a graceful shutdown, and this was the plan to solve this. However, it was rather complicated to implement.
Here I am proposing a quicker, but perhaps hackier, solution, which re-uses the already existing graceful shutdown "protocol" of the agent (i.e., the `join` method) to put the agent in a special state in which it will silence all errors due to a remote shutting down.
Such a check cannot happen in the `shutdown` method, because that's also used in case of ungraceful shutdown (in which case I believe we'd still want to display errors). Since it needs to make sure that all participants have transitioned to this new state before any of them can continue (as otherwise one of them may close its pipes before another one has realized that this is now expected), we need to perform a barrier. Hence the ideal place for it is the `join` method, where we're already doing a lot of gang-wide synchronization. Since the `join` method isn't only called during shutdown, we need to make sure we only switch the agent to this state when it's the last call to join, and we do so by adding a new optional argument to it (which will be ignored by all agents except the TensorPipe one).
I realize this isn't the prettiest solution, and since it changes the agent's API it's worth discussing it carefully. Let me know what you think!
ghstack-source-id: 121131940
Test Plan: Run on CircleCI, where this occurred quite a bit, and check the logs.
Reviewed By: mrshenli
Differential Revision: D26276137
fbshipit-source-id: 69ef14fe10908e80e627d9b4505352e482089cc8
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/50977
Adds a `blocking` flag that can be set to False to make this API return a `Future` to the type. This is to make this function non-blocking, mostly for a future change that will allow `rref.rpc_async()` to be completely non-blocking (it currently calls and waits for this function that issues an RPC in-line).
ghstack-source-id: 121021433
Test Plan: Modified UT
Reviewed By: mrshenli
Differential Revision: D25944582
fbshipit-source-id: e3b48a52af2d4578551a30ba6838927b489b1c03
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/50498
This change is mostly needed for the next diff in this stack, where
rref._get_type() is called in the rpc_async/rpc_sync RRef proxy function and
can block indefinitely if there is no timeout. It will also be useful to have a
timeout argument when we publicize this API to keep it consistent with other
RPC APIs.
ghstack-source-id: 119859767
Test Plan: Added UT
Reviewed By: pritamdamania87
Differential Revision: D25897588
fbshipit-source-id: 2e84aaf7e4faecf80005c78ee2ac8710f387503e
Summary:
These unused variables were identified by [pyflakes](https://pypi.org/project/pyflakes/). They can be safely removed to simplify the code.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/50181
Reviewed By: gchanan
Differential Revision: D25844270
fbshipit-source-id: 0e648ffe8c6db6daf56788a13ba89806923cbb76
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/41807
Test Plan: Make sure ci tests pass, including newly written test
Reviewed By: mrshenli
Differential Revision: D22640839
Pulled By: osandoval-fb
fbshipit-source-id: 3ff98d8e8c6e6d08575e307f05b5e159442d7216
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/46221
The RPC framework only allowed sending RPCs based on provided
WorkerInfo or name. When using RPC with DDP, sometimes it might just be easier
to refer to everything in terms of ranks since DDP doesn't support names yet.
As a result, support a `to` parameter in the RPC APIs which allow for
specifying a rank as well would be helpful.
ghstack-source-id: 114207172
Test Plan:
1) waitforbuildbot
2) Unit Tests
Reviewed By: mrshenli
Differential Revision: D24264989
fbshipit-source-id: 5edf5d92e2bd2f213471dfe7c74eebfa9efc9f70
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44983
`_all_gather` was converted from `_wait_all_workers` and inherited its
5 seconds fixed timeout. As `_all_gather` meant to support a broader
set of use cases, the timeout configuration should be more flexible.
This PR makes `rpc._all_gather` use the global default RPC timeout.
Test Plan: Imported from OSS
Reviewed By: pritamdamania87
Differential Revision: D23794383
Pulled By: mrshenli
fbshipit-source-id: 382f52c375f0f25c032c5abfc910f72baf4c5ad9
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/45088Fixes#45082
Found a few problems while working on #44983
1. We deliberately swallow RPC timeouts during shutdown, as we haven't
found a good way to handle those. When we convert `_wait_all_workers`
into `_all_gather`, the same logic was inherited. However, as
`_all_gather` meant to be used in more general scenarios, we should
no longer keep silent about errors. This commit let the error throw
in `_all_gather` and also let `shutdown()` to catch them and log.
2. After fixing (1), I found that `UnpickledPythonCall` needs to
acquire GIL on destruction, and this can lead to deadlock when used
in conjuction with `ProcessGroup`. Because `ProcessGroup` ctor is a
synchronization point which holds GIL. In `init_rpc`, followers
(`rank != 0`) can exit before the leader (`rank == 0`). If the two
happens together, we could get a) on a follower, it exits `init_rpc`
after running `_broadcast_to_followers` and before the reaching dtor
of `UnpickledPythonCall`. Then it runs the ctor of `ProcessGroup`,
which holds the GIL and wait for the leader to join. However, the
leader is waiting for the response from `_broadcast_to_followers`,
which is blocked by the dtor of `UnpickledPythonCall`. And hence
the deadlock. This commit drops the GIL in `ProcessGroup` ctor.
3. After fixing (2), I found that `TensorPipe` backend
nondeterministically fails with `test_local_shutdown`, due to a
similar reason as (2), but this time it is that `shutdown()` on a
follower runs before the leader finishes `init_rpc`. This commit
adds a join for `TensorPipe` backend `init_rpc` after `_all_gather`.
The 3rd one should be able to solve the 2nd one as well. But since
I didn't see a reason to hold GIL during `ProcessGroup` ctor, I
made that change too.
Test Plan: Imported from OSS
Reviewed By: pritamdamania87
Differential Revision: D23825592
Pulled By: mrshenli
fbshipit-source-id: 94920f2ad357746a6b8e4ffaa380dd56a7310976
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44663
The new API returns the type of the data object referenced by this
`RRef`. On the owner, this is same as `type(rref.local_value())`.
On a user, this will trigger an RPC to fetch the `type` object from
the owner. After this function is run once, the `type` object is
cached by the `RRef`, and subsequent invocations no longer trigger
RPC.
closes#33210
Test Plan: Imported from OSS
Reviewed By: rohan-varma
Differential Revision: D23691990
Pulled By: mrshenli
fbshipit-source-id: a2d87cd601a691dd75164b6bcd7315245e9cf6bd
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/42637
This commit enables sending non-CPU tensors through RPC using
TensorPipe backend. Users can configure device mappings by calling
set_map_location on `TensorPipeRpcBackendOptions`. Internally,
the `init_rpc` API verifies the correctness of device mappings. It
will shutdown RPC if the check failed, or proceed and pass global
mappings to `TensorPipeAgent` if the check was successful. For serde,
we added a device indices field to TensorPipe read and write buffers,
which should be either empty (all tensors must be on CPU) or match
the tensors in order and number in the RPC message. This commit
does not yet avoid zero-copy, the tensor is always moved to CPU
on the sender and then moved to the specified device on the receiver.
Test Plan: Imported from OSS
Reviewed By: izdeby
Differential Revision: D23011572
Pulled By: mrshenli
fbshipit-source-id: 62b617eed91237d4e9926bc8551db78b822a1187
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/42276
This commit converts `_wait_all_workers()` to `_all_gather()` by
allowing each worker to provide its own data object. The `_all_gather()`
function blocks and returns the gathered results. This API can be
converted to `rpc.barrier()` latter.
Test Plan: Imported from OSS
Reviewed By: lw
Differential Revision: D22853480
Pulled By: mrshenli
fbshipit-source-id: 9d506813b9fd5b7c144885e2b76a863cbd19466a
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/40902
See the bottom of this stack for context.
Test Plan: Imported from OSS
Reviewed By: eellison
Differential Revision: D22360210
Pulled By: suo
fbshipit-source-id: 4275127173a36982ce9ad357aa344435b98e1faf
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/40461
It turned out `:inheried-members:` (see [doc](https://www.sphinx-doc.org/en/master/usage/extensions/autodoc.html#directive-autoclass)) is not really usable.
Because pybind11 generates a docstring that writes `self` as parent class, `rpc.PyRRef`, type.
As a workaround, I am pulling docstrings on parent-class, `PyRRef` class, into subclass, `RRef`. And do surgery on the docstring generated by pybind11.
{F241283111}
ghstack-source-id: 106472496
Test Plan:
buck test mode/dev-nosan //caffe2/test/distributed/rpc/:rpc_fork
buck build mode/dev-nosan //caffe2/test/distributed/rpc/:rpc_fork && \
buck-out/gen/caffe2/test/distributed/rpc/rpc_fork\#binary.par \
-r test_rref_str
buck build mode/dev-nosan //caffe2/test/distributed/rpc/:rpc_fork && \
buck-out/gen/caffe2/test/distributed/rpc/rpc_fork\#binary.par \
-r test_return_local_rrefs
buck test mode/dev-nosan //caffe2/torch/fb/distributed/model_parallel/tests:test_elastic_averaging -- 'test_elastic_averaging_center \(caffe2\.torch\.fb\.distributed\.model_parallel\.tests\.test_elastic_averaging\.TestElasticAveragingCenter\)'
P134031188
Differential Revision: D7933834
fbshipit-source-id: c03a8a4c9d98888b64492a8caba1591595bfe247
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/40066
Builds on top of the previous PR to ensure that all remotely profiled events are prefixed with the key for the RPC that generated them.
The key is generated by the result of `_build_rpc_profiling_key` in `rpc/internal.py` and prefixed onto the event name. In order to do this, we set the current-key when creating the RPC in Python, retrieve the currently-set key in C++ and save a GloballyUniqueId -> key mapping to an in-memory map. When we receive an RPC with profiling information, we expect to receive this ID back, and look up the corresponding profiling key in the map.
The key is then added to all the remote events.
Tested by adding tests to ensure the key is added to all the remote events. Also added a UT which tests in under the multi-threading scenario, to ensure that the mapping's correctness is maintained when several RPCs are in the process of being created at once.
ghstack-source-id: 106316106
Test Plan: Unit test
Differential Revision: D22040035
fbshipit-source-id: 9215feb06084b294edbfa6e03385e13c1d730c43
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/39535
This is my understanding of what could happen: on workerN (N != 0), `_wait_all_workers_sequence_id_to_states`, which is a `defaultdict`, is accessed twice: once in the body of `_wait_all_workers` (by the "main thread" of workerN) and once in `_set_proceed_shutdown_signal`, called by worker0 through a RPC call. I think the two could race and access the `_wait_all_workers_sequence_id_to_states` at the same time, and thus create two separate copies of `WaitAllWorkersStates`. One of those threads would wait on the event of one copy, but the other thread would set the event of the other copy. This lead to a deadlock, as the main thread would end up waiting forever.
ghstack-source-id: 105283327
Test Plan: I added additional logging in those functions, ran a stress test of the RPC test suite, based on the logs I suspected that this could be the issue, fixed it and re-run the stress test and didn't see the bug anymore. This is admittedly not very convincing evidence, as I may just have been lucky that second time...
Differential Revision: D21889752
fbshipit-source-id: 05ec710bd2930313e1480ae896b4b2f5f503aa17
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/38590
This PR implements timeout semantics for RRef for parity with rpc_sync and rpc_async. How it works:
- Timeout parameter is added to rpc.remote. If the rpc.remote call times out, note that the error won't be raised to the user in that call, as it is not blocking (similar to rpc_async). Instead, the timeout error will be raised the next time the RRef is used (either by pickling or to_here call).
- Error handling semantics are added to RRef to deal with the timeout errors. Previously, if there was an error creating the OwnerRRef, the callback on the local user would throw an error in a callback, resulting in an `std::terminate`. Instead of this, the error is now caught and surfaced to the user the next time the RRef is used. As part of this, we have added an `RPCErrorType` enum and defined RRef error handlers to handle the `RPCErrorrTypes` (currently just timeout and unknown)
- A timeout parameter is added to `to_here()` which gives the user control over the max amount of time it can block for.
- `ctx.prepareChildForFork()` which is called when the RRef is pickled (i.e. used as an arg over RPC) checks if the `rpc.remote()` call had timed out, and if so, raises that error to the user.
- Tests are added, primarily via delay injection.
ghstack-source-id: 105232837
Test Plan: CI
Differential Revision: D21588165
fbshipit-source-id: c9f9e8aa3521012ea1de3e0f152a41afdf8b23f3
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/39267
When combined with `torch.jit.script`, the order of decorators matter.
`rpc.functions.async_execution` must be the outmost one. The
`async_execution` decorator will store the TorchScript function in
attribute `_wrapped_async_rpc_function` on the wrapper function, and
pass this wrapped TorchScript function (i.e., an instance of
`torch.jit.ScriptFunction`) to RPC. The caller will mark the ScriptCall
with `isAsyncExecution=true`, and the callee will extract the returned
`Future` in C++ and install subsequent processing as a callback to
that `Future`.
Test Plan: Imported from OSS
Differential Revision: D21792688
fbshipit-source-id: de095eb148d21e9114a478e9e6047c707d34fd07
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/39216
The `rpc.functions.async_execution` decorator specifies that the
wrapped function is guaranteed to return a `torch.futures.Future`.
The decorator adds a `_wrapped_async_rpc_function` attribute to
the wrapper function. The caller retrieves this information and
then sets `isAsyncFunction` argument accordingly which is later
added to PythonCall RPC message as a field. On the callee side,
if the PythonCall carries an asynchronous function, it will cast
the function's return value to a jit::PythonFutureWrapper object,
and then install response creation and communication as a callback
on the that jit::PythonFutureWrapper.
For applications, this feature is useful when a function needs to
wait for IO or additional singaling. In those cases, marking the
user function as `rpc.functions.async_execution` will prevent it
from blocking one thread on callee for too long.
Test Plan: Imported from OSS
Reviewed By: rohan-varma
Differential Revision: D21779962
fbshipit-source-id: 6b6aa698bf6f91dad6ed2a7ee433df429b59e941
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/39353
This test failed with TSAN since the shortened timeout prevented all
messages from being processed within the timeout during Phase 1 of
wait_all_workers during RPC shutdown. Phase 2 already had a longer timeout, so
we extend this to Phase 1 as well.
ghstack-source-id: 105045926
Test Plan: Ran the test_get_and_set_timeout with TSAN
Differential Revision: D21826783
fbshipit-source-id: 7edfdeb50169b31e997dd36a3fd8eea0e9ae7189