mirror of
https://github.com/pytorch/pytorch.git
synced 2025-10-20 21:14:14 +08:00
Preferring dash over underscore in command-line options. Add `--command-arg-name` to the argument parser. The old arguments with underscores `--command_arg_name` are kept for backward compatibility.
Both dashes and underscores are used in the PyTorch codebase. Some argument parsers only have dashes or only have underscores in arguments. For example, the `torchrun` utility for distributed training only accepts underscore arguments (e.g., `--master_port`). The dashes are more common in other command-line tools. And it looks to be the default choice in the Python standard library:
`argparse.BooleanOptionalAction`: 4a9dff0e5a/Lib/argparse.py (L893-L895)
```python
class BooleanOptionalAction(Action):
def __init__(...):
if option_string.startswith('--'):
option_string = '--no-' + option_string[2:]
_option_strings.append(option_string)
```
It adds `--no-argname`, not `--no_argname`. Also typing `_` need to press the shift or the caps-lock key than `-`.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/94505
Approved by: https://github.com/ezyang, https://github.com/seemethere
217 lines
6.4 KiB
Python
217 lines
6.4 KiB
Python
"""Handle the details of subprocess calls and retries for a given benchmark run."""
|
|
import dataclasses
|
|
import json
|
|
import os
|
|
import pickle
|
|
import signal
|
|
import subprocess
|
|
import time
|
|
from typing import List, Optional, Union, TYPE_CHECKING
|
|
import uuid
|
|
|
|
from core.api import AutoLabels
|
|
from core.types import Label
|
|
from core.utils import get_temp_dir
|
|
from worker.main import WORKER_PATH, WorkerFailure, WorkerOutput, WorkerTimerArgs, WorkerUnpickler
|
|
|
|
if TYPE_CHECKING:
|
|
PopenType = subprocess.Popen[bytes]
|
|
else:
|
|
PopenType = subprocess.Popen
|
|
|
|
|
|
# Mitigate https://github.com/pytorch/pytorch/issues/37377
|
|
_ENV = "MKL_THREADING_LAYER=GNU"
|
|
_PYTHON = "python"
|
|
PYTHON_CMD = f"{_ENV} {_PYTHON}"
|
|
|
|
# We must specify `bash` so that `source activate ...` always works
|
|
SHELL = "/bin/bash"
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class WorkOrder:
|
|
"""Spec to schedule work with the benchmark runner."""
|
|
label: Label
|
|
autolabels: AutoLabels
|
|
timer_args: WorkerTimerArgs
|
|
source_cmd: Optional[str] = None
|
|
timeout: Optional[float] = None
|
|
retries: int = 0
|
|
|
|
def __hash__(self) -> int:
|
|
return id(self)
|
|
|
|
def __str__(self) -> str:
|
|
return json.dumps({
|
|
"label": self.label,
|
|
"autolabels": self.autolabels.as_dict,
|
|
"num_threads": self.timer_args.num_threads,
|
|
})
|
|
|
|
|
|
class _BenchmarkProcess:
|
|
"""Wraps subprocess.Popen for a given WorkOrder."""
|
|
_work_order: WorkOrder
|
|
_cpu_list: Optional[str]
|
|
_proc: PopenType
|
|
|
|
# Internal bookkeeping
|
|
_communication_file: str
|
|
_start_time: float
|
|
_end_time: Optional[float] = None
|
|
_retcode: Optional[int]
|
|
_result: Optional[Union[WorkerOutput, WorkerFailure]] = None
|
|
|
|
def __init__(self, work_order: WorkOrder, cpu_list: Optional[str]) -> None:
|
|
self._work_order = work_order
|
|
self._cpu_list = cpu_list
|
|
self._start_time = time.time()
|
|
self._communication_file = os.path.join(get_temp_dir(), f"{uuid.uuid4()}.pkl")
|
|
with open(self._communication_file, "wb") as f:
|
|
pickle.dump(self._work_order.timer_args, f)
|
|
|
|
self._proc = subprocess.Popen(
|
|
self.cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
shell=True,
|
|
executable=SHELL,
|
|
)
|
|
|
|
def clone(self) -> "_BenchmarkProcess":
|
|
return _BenchmarkProcess(self._work_order, self._cpu_list)
|
|
|
|
@property
|
|
def cmd(self) -> str:
|
|
cmd: List[str] = []
|
|
if self._work_order.source_cmd is not None:
|
|
cmd.extend([self._work_order.source_cmd, "&&"])
|
|
|
|
cmd.append(_ENV)
|
|
|
|
if self._cpu_list is not None:
|
|
cmd.extend([
|
|
f"GOMP_CPU_AFFINITY={self._cpu_list}",
|
|
"taskset",
|
|
"--cpu-list",
|
|
self._cpu_list
|
|
])
|
|
|
|
cmd.extend([
|
|
_PYTHON, WORKER_PATH,
|
|
"--communication-file", self._communication_file,
|
|
])
|
|
return " ".join(cmd)
|
|
|
|
@property
|
|
def duration(self) -> float:
|
|
return (self._end_time or time.time()) - self._start_time
|
|
|
|
@property
|
|
def result(self) -> Union[WorkerOutput, WorkerFailure]:
|
|
self._maybe_collect()
|
|
assert self._result is not None
|
|
return self._result
|
|
|
|
def poll(self) -> Optional[int]:
|
|
self._maybe_collect()
|
|
return self._retcode
|
|
|
|
def interrupt(self) -> None:
|
|
"""Soft interrupt. Allows subprocess to cleanup."""
|
|
self._proc.send_signal(signal.SIGINT)
|
|
|
|
def terminate(self) -> None:
|
|
"""Hard interrupt. Immediately SIGTERM subprocess."""
|
|
self._proc.terminate()
|
|
|
|
def _maybe_collect(self) -> None:
|
|
if self._result is not None:
|
|
# We've already collected the results.
|
|
return
|
|
|
|
self._retcode = self._proc.poll()
|
|
if self._retcode is None:
|
|
# `_proc` is still running
|
|
return
|
|
|
|
with open(self._communication_file, "rb") as f:
|
|
result = WorkerUnpickler(f).load_output()
|
|
|
|
if isinstance(result, WorkerOutput) and self._retcode:
|
|
# Worker managed to complete the designated task, but worker
|
|
# process did not finish cleanly.
|
|
result = WorkerFailure("Worker failed silently.")
|
|
|
|
if isinstance(result, WorkerTimerArgs):
|
|
# Worker failed, but did not write a result so we're left with the
|
|
# original TimerArgs. Grabbing all of stdout and stderr isn't
|
|
# ideal, but we don't have a better way to determine what to keep.
|
|
proc_stdout = self._proc.stdout
|
|
assert proc_stdout is not None
|
|
result = WorkerFailure(
|
|
failure_trace=proc_stdout.read().decode("utf-8"))
|
|
|
|
self._result = result
|
|
self._end_time = time.time()
|
|
|
|
# Release communication file.
|
|
os.remove(self._communication_file)
|
|
|
|
|
|
class InProgress:
|
|
"""Used by the benchmark runner to track outstanding jobs.
|
|
This class handles bookkeeping and timeout + retry logic.
|
|
"""
|
|
_proc: _BenchmarkProcess
|
|
_timeouts: int = 0
|
|
|
|
def __init__(self, work_order: WorkOrder, cpu_list: Optional[str]):
|
|
self._work_order = work_order
|
|
self._proc = _BenchmarkProcess(work_order, cpu_list)
|
|
|
|
@property
|
|
def work_order(self) -> WorkOrder:
|
|
return self._proc._work_order
|
|
|
|
@property
|
|
def cpu_list(self) -> Optional[str]:
|
|
return self._proc._cpu_list
|
|
|
|
@property
|
|
def proc(self) -> _BenchmarkProcess:
|
|
# NB: For cleanup only.
|
|
return self._proc
|
|
|
|
@property
|
|
def duration(self) -> float:
|
|
return self._proc.duration
|
|
|
|
def check_finished(self) -> bool:
|
|
if self._proc.poll() is not None:
|
|
return True
|
|
|
|
timeout = self.work_order.timeout
|
|
if timeout is None or self._proc.duration < timeout:
|
|
return False
|
|
|
|
self._timeouts += 1
|
|
max_attempts = (self._work_order.retries or 0) + 1
|
|
if self._timeouts < max_attempts:
|
|
print(
|
|
f"\nTimeout: {self._work_order.label}, {self._work_order.autolabels} "
|
|
f"(Attempt {self._timeouts} / {max_attempts})")
|
|
self._proc.interrupt()
|
|
self._proc = self._proc.clone()
|
|
return False
|
|
|
|
raise subprocess.TimeoutExpired(cmd=self._proc.cmd, timeout=timeout)
|
|
|
|
@property
|
|
def result(self) -> Union[WorkerOutput, WorkerFailure]:
|
|
return self._proc.result
|
|
|
|
def __hash__(self) -> int:
|
|
return id(self)
|