mirror of
https://github.com/pytorch/pytorch.git
synced 2025-10-21 13:44:15 +08:00
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/55412 The diff resolves bug where worker processes could exit before torchelastic process would read the return values. This is a rare event, but still can happen, e.g. https://fb.workplace.com/groups/319878845696681/permalink/512409069776990/ When users want to return torch.Tensor object from worker process, the torchelastic multiprocessing will fail. Currently worker process finishes its job after it writes output to the IPC queue without receiver process confirmation. When this happens, the underlying channel between worker and torchelastic process could be closed (in case of mp.SimpleQueue it is file descriptors, that is why we see FileNotFoundException: since worker process finished execution, the file descriptor just got deleted, and torchelastic process cannot find it). Test Plan: buck test mode/dev-nosan //caffe2/test/distributed/elastic/agent/server/test:local_agent_test User workflow: f263531643 Reviewed By: cbalioglu Differential Revision: D27602838 fbshipit-source-id: 29871178232e3af4ad3dec406c234aba9c5faba1
594 lines
20 KiB
Python
594 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# Copyright (c) Facebook, Inc. and its affiliates.
|
|
# All rights reserved.
|
|
#
|
|
# This source code is licensed under the BSD-style license found in the
|
|
# LICENSE file in the root directory of this source tree.
|
|
|
|
import abc
|
|
import logging
|
|
import os
|
|
import re
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from contextlib import AbstractContextManager
|
|
from dataclasses import dataclass, field
|
|
from enum import IntFlag
|
|
from multiprocessing import synchronize
|
|
from typing import Any, Callable, Dict, Optional, Set, Tuple, Union
|
|
|
|
import torch.multiprocessing as mp
|
|
from torch.distributed.elastic.multiprocessing.errors import ProcessFailure, record
|
|
from torch.distributed.elastic.multiprocessing.redirects import (
|
|
redirect_stderr,
|
|
redirect_stdout,
|
|
)
|
|
from torch.distributed.elastic.multiprocessing.tail_log import TailLog
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def _validate_full_rank(d: Dict[int, Any], nprocs: int, what: str):
|
|
actual_keys = set(d.keys())
|
|
expected_keys = set(range(nprocs))
|
|
|
|
if actual_keys != expected_keys:
|
|
raise RuntimeError(
|
|
f"{what}, local rank mapping mismatch,"
|
|
f" expected: {expected_keys}, actual: {actual_keys}"
|
|
)
|
|
|
|
|
|
_MAPPING_REGEX = r"^(\d:[0123],)*(\d:[0123])$"
|
|
_VALUE_REGEX = r"^[0123]$"
|
|
|
|
|
|
class Std(IntFlag):
|
|
NONE = 0
|
|
OUT = 1
|
|
ERR = 2
|
|
ALL = OUT | ERR
|
|
|
|
@classmethod
|
|
def from_str(cls, vm: str) -> Union["Std", Dict[int, "Std"]]:
|
|
"""
|
|
Example:
|
|
|
|
::
|
|
|
|
from_str("0") -> Std.NONE
|
|
from_str("1") -> Std.OUT
|
|
from_str("0:3,1:0,2:1,3:2") -> {0: Std.ALL, 1: Std.NONE, 2: Std.OUT, 3: Std.ERR}
|
|
|
|
Any other input raises an exception
|
|
"""
|
|
|
|
def to_std(v):
|
|
v = int(v)
|
|
for s in Std:
|
|
if s == v:
|
|
return s
|
|
# return None -> should NEVER reach here since we regex check input
|
|
|
|
if re.match(_VALUE_REGEX, vm): # vm is a number (e.g. 0)
|
|
return to_std(vm)
|
|
elif re.match(_MAPPING_REGEX, vm): # vm is a mapping (e.g. 0:1,1:2)
|
|
d: Dict[int, Std] = {}
|
|
for m in vm.split(","):
|
|
i, v = m.split(":")
|
|
d[int(i)] = to_std(v)
|
|
return d
|
|
else:
|
|
raise ValueError(
|
|
f"{vm} does not match: <{_VALUE_REGEX}> or <{_MAPPING_REGEX}>"
|
|
)
|
|
|
|
|
|
def to_map(
|
|
val_or_map: Union[Std, Dict[int, Std]], local_world_size: int
|
|
) -> Dict[int, Std]:
|
|
"""
|
|
Certain APIs take redirect settings either as a single value (e.g. apply to all
|
|
local ranks) or as an explicit user-provided mapping. This method is a convenience
|
|
method that converts a value or mapping into a mapping.
|
|
|
|
Example:
|
|
|
|
::
|
|
|
|
to_map(Std.OUT, local_world_size=2) # returns: {0: Std.OUT, 1: Std.OUT}
|
|
to_map({1: Std.OUT}, local_world_size=2) # returns: {0: Std.NONE, 1: Std.OUT}
|
|
to_map({0: Std.OUT, 1: Std.OUT}, local_world_size=2) # returns: {0: Std.OUT, 1: Std.OUT}
|
|
"""
|
|
if isinstance(val_or_map, Std):
|
|
return {i: val_or_map for i in range(local_world_size)}
|
|
else:
|
|
map = {}
|
|
for i in range(local_world_size):
|
|
map[i] = val_or_map.get(i, Std.NONE)
|
|
return map
|
|
|
|
|
|
@dataclass
|
|
class RunProcsResult:
|
|
"""
|
|
Results of a completed run of processes started with ``start_processes()``.
|
|
Returned by ``PContext``.
|
|
|
|
Note the following:
|
|
|
|
1. All fields are mapped by local rank
|
|
2. ``return_values`` - only populated for functions (not the binaries).
|
|
3. ``stdouts`` - path to stdout.log (empty string if no redirect)
|
|
4. ``stderrs`` - path to stderr.log (empty string if no redirect)
|
|
|
|
"""
|
|
|
|
return_values: Dict[int, Any] = field(default_factory=dict)
|
|
failures: Dict[int, ProcessFailure] = field(default_factory=dict)
|
|
stdouts: Dict[int, str] = field(default_factory=dict)
|
|
stderrs: Dict[int, str] = field(default_factory=dict)
|
|
|
|
def is_failed(self) -> bool:
|
|
return len(self.failures) > 0
|
|
|
|
|
|
class PContext(abc.ABC):
|
|
"""
|
|
The base class that standardizes operations over a set of processes
|
|
that are launched via different mechanisms. The name ``PContext``
|
|
is intentional to disambiguate with ``torch.multiprocessing.ProcessContext``.
|
|
|
|
.. warning:: stdouts and stderrs should ALWAYS be a superset of
|
|
tee_stdouts and tee_stderrs (respectively) this is b/c
|
|
tee is implemented as a redirect + tail -f <stdout/stderr.log>
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
entrypoint: Union[Callable, str],
|
|
args: Dict[int, Tuple],
|
|
envs: Dict[int, Dict[str, str]],
|
|
stdouts: Dict[int, str],
|
|
stderrs: Dict[int, str],
|
|
tee_stdouts: Dict[int, str],
|
|
tee_stderrs: Dict[int, str],
|
|
error_files: Dict[int, str],
|
|
):
|
|
self.name = name
|
|
# validate that all mappings have the same number of keys and
|
|
# all local ranks are accounted for
|
|
nprocs = len(args)
|
|
_validate_full_rank(stdouts, nprocs, "stdouts")
|
|
_validate_full_rank(stderrs, nprocs, "stderrs")
|
|
|
|
self.entrypoint = entrypoint
|
|
self.args = args
|
|
self.envs = envs
|
|
self.stdouts = stdouts
|
|
self.stderrs = stderrs
|
|
self.error_files = error_files
|
|
self.nprocs = nprocs
|
|
|
|
self._stdout_tail = TailLog(name, tee_stdouts, sys.stdout)
|
|
self._stderr_tail = TailLog(name, tee_stderrs, sys.stderr)
|
|
|
|
def start(self) -> None:
|
|
"""
|
|
Start processes using parameters defined in the constructor.
|
|
"""
|
|
self._start()
|
|
self._stdout_tail.start()
|
|
self._stderr_tail.start()
|
|
|
|
@abc.abstractmethod
|
|
def _start(self) -> None:
|
|
"""
|
|
Start processes using strategy defined in a particular context.
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
@abc.abstractmethod
|
|
def _poll(self) -> Optional[RunProcsResult]:
|
|
"""
|
|
Polls the run status of the processes running under this context.
|
|
This method follows an "all-or-nothing" policy and returns
|
|
a ``RunProcessResults`` object if either all processes complete
|
|
successfully or any process fails. Returns ``None`` if
|
|
all processes are still running.
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
def wait(self, timeout: float = -1, period: float = 1) -> Optional[RunProcsResult]:
|
|
"""
|
|
Waits for the specified ``timeout`` seconds, polling every ``period`` seconds
|
|
for the processes to be done. Returns ``None`` if the processes are still running
|
|
on timeout expiry. Negative timeout values are interpreted as "wait-forever".
|
|
A timeout value of zero simply queries the status of the processes (e.g. equivalent
|
|
to a poll).
|
|
"""
|
|
|
|
if timeout == 0:
|
|
return self._poll()
|
|
|
|
if timeout < 0:
|
|
timeout = sys.maxsize
|
|
|
|
expiry = time.time() + timeout
|
|
while time.time() < expiry:
|
|
pr = self._poll()
|
|
if pr:
|
|
return pr
|
|
time.sleep(period)
|
|
|
|
return None
|
|
|
|
@abc.abstractmethod
|
|
def pids(self) -> Dict[int, int]:
|
|
"""
|
|
Returns pids of processes mapped by their respective local_ranks
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
@abc.abstractmethod
|
|
def _close(self) -> None:
|
|
r"""
|
|
Terminates all processes managed by this context and cleans up any
|
|
meta resources (e.g. redirect, error_file files).
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
def close(self) -> None:
|
|
self._close()
|
|
if self._stdout_tail:
|
|
self._stdout_tail.stop()
|
|
if self._stderr_tail:
|
|
self._stderr_tail.stop()
|
|
|
|
|
|
class _nullcontext(AbstractContextManager):
|
|
# TODO remove and replace in favor of contextlib.nullcontext
|
|
# when torch drops support for python3.6
|
|
def __init__(self, enter_result=None):
|
|
self.enter_result = enter_result
|
|
|
|
def __enter__(self):
|
|
return self.enter_result
|
|
|
|
def __exit__(self, *excinfo):
|
|
pass
|
|
|
|
|
|
def _wrap(
|
|
local_rank: int,
|
|
fn: Callable,
|
|
args: Dict[int, Tuple],
|
|
envs: Dict[int, Dict[str, str]],
|
|
stdout_redirects: Dict[int, str], # redirect file for stdout (to console if None)
|
|
stderr_redirects: Dict[int, str], # redirect file for stderr (to console if None)
|
|
ret_vals: Dict[int, mp.SimpleQueue],
|
|
queue_finished_reading_event: synchronize.Event,
|
|
) -> None:
|
|
# get the per-rank params up front so we fail fast if no mapping is found
|
|
args_ = args[local_rank]
|
|
env_ = envs[local_rank]
|
|
ret_val_ = ret_vals[local_rank]
|
|
|
|
stdout_rd = stdout_redirects[local_rank]
|
|
stderr_rd = stderr_redirects[local_rank]
|
|
|
|
stdout_cm = redirect_stdout(stdout_rd) if stdout_rd else _nullcontext()
|
|
stderr_cm = redirect_stderr(stderr_rd) if stderr_rd else _nullcontext()
|
|
|
|
for k, v in env_.items():
|
|
os.environ[k] = v
|
|
|
|
with stdout_cm, stderr_cm:
|
|
ret = record(fn)(*args_)
|
|
ret_val_.put(ret)
|
|
queue_finished_reading_event.wait()
|
|
|
|
|
|
class MultiprocessContext(PContext):
|
|
"""
|
|
``PContext`` holding worker processes invoked as a function.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
entrypoint: Callable,
|
|
args: Dict[int, Tuple],
|
|
envs: Dict[int, Dict[str, str]],
|
|
stdouts: Dict[int, str],
|
|
stderrs: Dict[int, str],
|
|
tee_stdouts: Dict[int, str],
|
|
tee_stderrs: Dict[int, str],
|
|
error_files: Dict[int, str],
|
|
start_method: str,
|
|
):
|
|
super().__init__(
|
|
name,
|
|
entrypoint,
|
|
args,
|
|
envs,
|
|
stdouts,
|
|
stderrs,
|
|
tee_stdouts,
|
|
tee_stderrs,
|
|
error_files,
|
|
)
|
|
|
|
self.start_method = start_method
|
|
# each ret_val queue will always contain a single element.
|
|
self._ret_vals = {
|
|
local_rank: mp.get_context(self.start_method).SimpleQueue()
|
|
for local_rank in range(self.nprocs)
|
|
}
|
|
|
|
# see comments in ``join()`` for what this is
|
|
self._return_values: Dict[int, Any] = {}
|
|
self._pc: Optional[mp.ProcessContext] = None
|
|
# Note: set method should ONLY be invoked for the use case when all processes finished
|
|
# successfully. If any process died on event.wait() calling set() method will deadlock.
|
|
self._worker_finished_event = mp.get_context(self.start_method).Event()
|
|
|
|
def _start(self):
|
|
if self._pc:
|
|
raise ValueError(
|
|
"The process context already initialized."
|
|
" Most likely the start method got called twice."
|
|
)
|
|
self._pc = mp.start_processes(
|
|
fn=_wrap,
|
|
args=(
|
|
self.entrypoint,
|
|
self.args,
|
|
self.envs,
|
|
self.stdouts,
|
|
self.stderrs,
|
|
self._ret_vals,
|
|
self._worker_finished_event,
|
|
),
|
|
nprocs=self.nprocs,
|
|
join=False,
|
|
daemon=False,
|
|
start_method=self.start_method,
|
|
)
|
|
|
|
def _is_done(self) -> bool:
|
|
return len(self._return_values) == self.nprocs
|
|
|
|
def _poll(self) -> Optional[RunProcsResult]:
|
|
assert self._pc is not None # assertion for mypy type checker
|
|
|
|
try:
|
|
# torch.mp.ProcessContext Throws an Exception if some/all of
|
|
# worker processes failed
|
|
# timeout < 0 checks worker status and return immediately
|
|
# Join will never return success since we use synchronize.Event to wait
|
|
# for all processes to finish.
|
|
self._pc.join(-1)
|
|
|
|
# IMPORTANT: we use multiprocessing.Queue to carry worker return values
|
|
# back to the parent, the worker process will wait before terminating
|
|
# until all the buffered items are fed by the feeder thread to the underlying
|
|
# pipe. Hence to prevent deadlocks on large return values,
|
|
# we opportunistically try queue.get on each join call
|
|
# See: https://docs.python.org/2/library/multiprocessing.html#all-platforms
|
|
for local_rank in range(0, self.nprocs):
|
|
return_queue = self._ret_vals[local_rank]
|
|
if not return_queue.empty():
|
|
# save the return values temporarily into a member var
|
|
self._return_values[local_rank] = return_queue.get()
|
|
|
|
if self._is_done():
|
|
# we should ALWAYS have ALL the return values when all the processes are done
|
|
self._worker_finished_event.set()
|
|
# Wait untill all processes are finished. At this point workers finished executing
|
|
# user function
|
|
self._pc.join()
|
|
_validate_full_rank(
|
|
self._return_values, self.nprocs, "return_value queue"
|
|
)
|
|
self.close()
|
|
return RunProcsResult(
|
|
return_values=self._return_values,
|
|
stdouts=self.stdouts,
|
|
stderrs=self.stderrs,
|
|
)
|
|
else:
|
|
return None
|
|
except (mp.ProcessRaisedException, mp.ProcessExitedException) as e:
|
|
failed_local_rank = e.error_index
|
|
|
|
# entrypoint for MultiprocessContext will always be a Callable
|
|
fn_name = self.entrypoint.__qualname__ # type: ignore[union-attr]
|
|
failed_proc = self._pc.processes[failed_local_rank]
|
|
error_filepath = self.error_files[failed_local_rank]
|
|
|
|
log.error(
|
|
f"failed (exitcode: {failed_proc.exitcode})"
|
|
f" local_rank: {failed_local_rank} (pid: {e.pid})"
|
|
f" of fn: {fn_name} (start_method: {self.start_method})",
|
|
exc_info=True,
|
|
)
|
|
|
|
self.close()
|
|
return RunProcsResult(
|
|
failures={
|
|
failed_local_rank: ProcessFailure(
|
|
local_rank=failed_local_rank,
|
|
pid=e.pid,
|
|
exitcode=failed_proc.exitcode,
|
|
error_file=error_filepath,
|
|
)
|
|
},
|
|
stdouts=self.stdouts,
|
|
stderrs=self.stderrs,
|
|
)
|
|
|
|
def pids(self) -> Dict[int, int]:
|
|
assert self._pc is not None # assertion for mypy type checking
|
|
return {local_rank: pid for local_rank, pid in enumerate(self._pc.pids())}
|
|
|
|
def _close(self) -> None:
|
|
if self._pc:
|
|
for proc in self._pc.processes:
|
|
proc.terminate()
|
|
proc.join()
|
|
|
|
|
|
class SubprocessHandler:
|
|
"""
|
|
Convenience wrapper around python's ``subprocess.Popen``. Keeps track of
|
|
meta-objects associated to the process (e.g. stdout and stderr redirect fds).
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
entrypoint: str,
|
|
args: Tuple,
|
|
env: Dict[str, str],
|
|
preexec_fn: Callable,
|
|
stdout: str,
|
|
stderr: str,
|
|
):
|
|
self._stdout = open(stdout, "w") if stdout else None
|
|
self._stderr = open(stderr, "w") if stderr else None
|
|
args_str = [str(e) for e in args]
|
|
|
|
# inherit parent environment vars
|
|
env_vars = os.environ.copy()
|
|
env_vars.update(env)
|
|
|
|
self.proc: subprocess.Popen = subprocess.Popen(
|
|
# pyre-fixme[6]: Expected `Union[typing.Sequence[Union[_PathLike[bytes],
|
|
# _PathLike[str], bytes, str]], bytes, str]` for 1st param but got
|
|
# `Tuple[str, *Tuple[Any, ...]]`.
|
|
args=(entrypoint, *args_str),
|
|
env=env_vars,
|
|
preexec_fn=preexec_fn,
|
|
stdout=self._stdout,
|
|
stderr=self._stderr,
|
|
)
|
|
|
|
def close(self):
|
|
self.proc.terminate()
|
|
self.proc.wait()
|
|
if self._stdout:
|
|
self._stdout.close()
|
|
if self._stderr:
|
|
self._stderr.close()
|
|
|
|
|
|
class SubprocessContext(PContext):
|
|
"""
|
|
``PContext`` holding worker processes invoked as a binary.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
entrypoint: str,
|
|
args: Dict[int, Tuple],
|
|
envs: Dict[int, Dict[str, str]],
|
|
stdouts: Dict[int, str],
|
|
stderrs: Dict[int, str],
|
|
tee_stdouts: Dict[int, str],
|
|
tee_stderrs: Dict[int, str],
|
|
error_files: Dict[int, str],
|
|
):
|
|
super().__init__(
|
|
name,
|
|
entrypoint,
|
|
args,
|
|
envs,
|
|
stdouts,
|
|
stderrs,
|
|
tee_stdouts,
|
|
tee_stderrs,
|
|
error_files,
|
|
)
|
|
|
|
# state vector; _vdone[local_rank] -> is local_rank finished or not
|
|
self._running_local_ranks: Set[int] = set(range(self.nprocs))
|
|
self._failures: Dict[int, ProcessFailure] = {}
|
|
self.subprocess_handlers: Dict[int, SubprocessHandler] = {}
|
|
|
|
def _start(self):
|
|
if self.subprocess_handlers:
|
|
raise ValueError(
|
|
"The subprocess handlers already initialized. Most likely the start method got called twice."
|
|
)
|
|
self.subprocess_handlers = {
|
|
local_rank: SubprocessHandler(
|
|
entrypoint=self.entrypoint, # type: ignore[arg-type] # entrypoint is always a str
|
|
args=self.args[local_rank],
|
|
env=self.envs[local_rank],
|
|
preexec_fn=mp._prctl_pr_set_pdeathsig(signal.SIGTERM), # type: ignore[attr-defined]
|
|
stdout=self.stdouts[local_rank],
|
|
stderr=self.stderrs[local_rank],
|
|
)
|
|
for local_rank in range(self.nprocs)
|
|
}
|
|
|
|
def _poll(self) -> Optional[RunProcsResult]:
|
|
done_local_ranks = set()
|
|
for local_rank in self._running_local_ranks:
|
|
handler = self.subprocess_handlers[local_rank]
|
|
exitcode = handler.proc.poll()
|
|
if exitcode is not None:
|
|
done_local_ranks.add(local_rank)
|
|
if exitcode != 0: # failed or signaled
|
|
self._failures[local_rank] = ProcessFailure(
|
|
local_rank=local_rank,
|
|
pid=handler.proc.pid,
|
|
exitcode=exitcode,
|
|
error_file=self.error_files[local_rank],
|
|
)
|
|
# else: --> succeeded; nothing to do
|
|
|
|
self._running_local_ranks.difference_update(done_local_ranks)
|
|
|
|
# if ALL procs are finished or ANY have failed
|
|
if not self._running_local_ranks or self._failures:
|
|
self.close() # terminate all running procs
|
|
result = RunProcsResult(
|
|
failures=self._failures,
|
|
stdouts=self.stdouts,
|
|
stderrs=self.stderrs,
|
|
)
|
|
if result.is_failed():
|
|
first_failure = min(result.failures.values(), key=lambda f: f.timestamp)
|
|
log.error(
|
|
f"failed (exitcode: {first_failure.exitcode})"
|
|
f" local_rank: {first_failure.local_rank} (pid: {first_failure.pid})"
|
|
f" of binary: {self.entrypoint}"
|
|
)
|
|
else:
|
|
# Populate return with dummy values. This provides consistency with MultiprocessingHandler
|
|
result.return_values = {
|
|
local_rank: None for local_rank in range(self.nprocs)
|
|
}
|
|
|
|
return result
|
|
else: # there are no failures and procs still running
|
|
return None
|
|
|
|
def pids(self) -> Dict[int, int]:
|
|
return {
|
|
local_rank: sh.proc.pid
|
|
for local_rank, sh in self.subprocess_handlers.items()
|
|
}
|
|
|
|
def _close(self) -> None:
|
|
if self.subprocess_handlers:
|
|
for handler in self.subprocess_handlers.values():
|
|
handler.close()
|