add supports_coalescing property in c10d::Backend to determine whether backend supports coalescing (#135338)

1. My company is using privateuseone to connect new hardware device and requires the use of `batch_isend_irecv` function. However, `batch_isend_irecv` is currently only open to CUDA, so I add `supports_coalescing` property in `c10d::Backend` to determine whether backend supports coalescing.
2. If `pg._has_hooks` return True, We don't need to determine if the current device is CUDA. So privateuseone can also support `pg._wait_for_pending_works`

Pull Request resolved: https://github.com/pytorch/pytorch/pull/135338
Approved by: https://github.com/kwen2501, https://github.com/albanD
This commit is contained in:
taozhiwei
2025-03-04 12:37:06 +00:00
committed by PyTorch MergeBot
parent e3e45d90d8
commit 16d07988fc
6 changed files with 47 additions and 8 deletions

View File

@ -289,6 +289,8 @@ class Backend:
@property
def supports_splitting(self) -> bool: ...
@property
def supports_coalescing(self) -> bool: ...
@property
def options(self) -> Options: ...
def rank(self) -> int: ...
def size(self) -> int: ...

View File

@ -69,6 +69,10 @@ class TORCH_API Backend : public torch::CustomClassHolder {
return false;
}
virtual bool supportsCoalescing() const {
return false;
}
virtual void startCoalescing() {
TORCH_CHECK(
false,

View File

@ -824,7 +824,7 @@ class TORCH_API ProcessGroup : public torch::CustomClassHolder {
TORCH_CHECK(
backendTypeToBackend_.find(backendType_) != backendTypeToBackend_.end(),
"Could not find the default backend type ",
backendType_,
uint16_t(backendType_),
" for Process Group with name ",
getBackendName(),
".");
@ -845,7 +845,9 @@ class TORCH_API ProcessGroup : public torch::CustomClassHolder {
TORCH_CHECK(
backendTypeToBackend_.find(backendType) != backendTypeToBackend_.end(),
"Could not find backend type ",
backendType,
uint16_t(backendType),
" for Process Group with name ",
backendTypeToString(backendType),
".");
return backendTypeToBackend_.at(backendType);
}

View File

@ -611,6 +611,10 @@ class TORCH_API ProcessGroupNCCL : public Backend {
return true;
}
bool supportsCoalescing() const override {
return true;
}
void startCoalescing() override;
c10::intrusive_ptr<Work> endCoalescing() override;

View File

@ -2482,6 +2482,10 @@ Arguments:
"supports_splitting",
&::c10d::Backend::supportsSplitting,
"(test whether the backend supports splitting)")
.def_property_readonly(
"supports_coalescing",
&::c10d::Backend::supportsCoalescing,
"(test whether the backend supports coalescing)")
.def(
"broadcast",
&::c10d::Backend::broadcast,

View File

@ -1912,13 +1912,34 @@ def _new_process_group_helper(
group_rank,
group_size,
)
# Set the default backend when only single backend is passed in.
backend_config = BackendConfig(backend)
# Set the default backend when single backend is passed in.
if "," not in str(backend) and ":" not in str(backend):
assert backend in Backend.backend_type_map, f"Unknown backend type {backend}"
pg._set_default_backend(Backend.backend_type_map[backend])
if backend == Backend.UNDEFINED:
# Currently when backend is UNDEFINED, both ``gloo`` and ``nccl`` backends
# will be created, we use nccl(if cuda is available) or gloo as default
# backend so we can correctly call getDefaultBackend which in ProcessGroup.
if Backend.NCCL in backend_config.get_device_backend_map().values():
pg._set_default_backend(ProcessGroup.BackendType.NCCL)
else:
pg._set_default_backend(ProcessGroup.BackendType.GLOO)
else:
pg._set_default_backend(Backend.backend_type_map[backend])
# In order to correctly call pg._has_hooks(), we should set the default backend
# when multi backend is passed in
else:
if Backend.NCCL in backend_config.device_backend_map.values():
pg._set_default_backend(ProcessGroup.BackendType.NCCL)
elif Backend._plugins.keys():
custom_backend = next(iter(Backend._plugins.keys()))
if custom_backend in backend_config.device_backend_map.values():
pg._set_default_backend(ProcessGroup.BackendType.CUSTOM)
else:
pg._set_default_backend(ProcessGroup.BackendType.GLOO)
if device_id:
pg.bound_device_id = device_id
backend_config = BackendConfig(backend)
backend_class: torch._C._distributed_c10d.Backend
for device, backend_str in backend_config.get_device_backend_map().items():
# Use the group name as prefix in the default store, such that
@ -2132,7 +2153,7 @@ def destroy_process_group(group: Optional[ProcessGroup] = None):
# alive until all works and hooks are done. The current implementation does the
# latter. Therefore, we explicitly call _wait_for_pending_works() here to wait
# for the pending hooks to finish.
if pg.name().lower() == "nccl" and pg._has_hooks():
if type(pg) == ProcessGroup and pg._has_hooks():
pg._wait_for_pending_works()
if group is None or group == GroupMember.WORLD:
@ -2650,13 +2671,15 @@ def batch_isend_irecv(p2p_op_list: list[P2POp]) -> list[Work]:
"""
_check_p2p_op_list(p2p_op_list)
group = p2p_op_list[0].group
if group is None:
group = _get_default_group()
device = p2p_op_list[0].tensor.device
def peer_kwarg(op: P2POp) -> dict[str, int]:
key = "group_dst" if op.op == isend else "group_src"
return {key: op.group_peer}
if device.type == "cuda":
if type(group) == ProcessGroup and group._get_backend(device).supports_coalescing:
# NCCL style coalescing
with _coalescing_manager(group, device, async_ops=True) as cm:
for p2p_op in p2p_op_list:
@ -2669,7 +2692,7 @@ def batch_isend_irecv(p2p_op_list: list[P2POp]) -> list[Work]:
return cm.works
else:
# Backward support for Gloo
# backend not support coalescing
reqs = []
for p2p_op in p2p_op_list:
work = p2p_op.op(