Files
pytorch/torch/_C/_distributed_rpc_testing.pyi
Howard Huang 7b376bf844 Remove ProcessGroup from TensorPipeAgent initialization (#68128)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/68128

Reland of D31762735 (0cbfd466d2).

This diff was originally reverted due to failure in test_send_export_type_through_rpc_with_custom_pickler.

I updated rpc_pickler_test.py to prevent a race condition where processes were not registering their pickler before handling their rpc_sync calls.

Test Plan:
rpc_pickler_test file:

buck test mode/dev-nosan -c 'cxx.coverage_only=caffe2' //caffe2/torch/fb/training_toolkit/backend/metrics/tests:rpc_pickler_test //caffe2/torch/fb/training_toolkit/backend/metrics/collectors/fbdata_aggregator/tests:batch_collector_test -- --run-disabled --collect-coverage '--code-coverage-session=test_session' --force-tpx

rpc_pickler stress test:

buck test mode/dev-nosan -c 'cxx.coverage_only=caffe2' //caffe2/torch/fb/training_toolkit/backend/metrics/tests:rpc_pickler_test -- --exact 'caffe2/torch/fb/training_toolkit/backend/metrics/tests:rpc_pickler_test - test_send_export_type_through_rpc_with_custom_pickler (caffe2.torch.fb.training_toolkit.backend.metrics.tests.rpc_pickler_test.CythonTypeRpcSpawnTest)' --run-disabled --collect-coverage '--code-coverage-session=test_session' --force-tpx --jobs 18 --stress-runs 10 --record-results

Reviewed By: mrshenli

Differential Revision: D32316077

fbshipit-source-id: e58de2335fbaa3ab46d46fe222c659197633a5e4
2021-11-11 12:28:55 -08:00

39 lines
1.1 KiB
Python

import torch
from ._distributed_c10d import ProcessGroup, Store
from ._distributed_rpc import (
_TensorPipeRpcBackendOptionsBase,
TensorPipeAgent,
WorkerInfo,
)
from typing import List, Dict, overload
from datetime import timedelta
# This module is defined in torch/csrc/distributed/rpc/testing/init.cpp
class FaultyTensorPipeRpcBackendOptions(_TensorPipeRpcBackendOptionsBase):
def __init__(
self,
num_worker_threads: int,
rpc_timeout: float,
init_method: str,
messages_to_fail: List[str],
messages_to_delay: Dict[str, float],
num_fail_sends: int,
): ...
num_send_recv_threads: int
messages_to_fail: List[str]
messages_to_delay: Dict[str, float]
num_fail_sends: int
class FaultyTensorPipeAgent(TensorPipeAgent):
def __init__(
self,
store: Store,
name: str,
rank: int,
world_size: int,
options: FaultyTensorPipeRpcBackendOptions,
reverse_device_maps: Dict[str, Dict[torch.device, torch.device]],
devices: List[torch.device],
): ...