Compare commits

...

2 Commits

Author SHA1 Message Date
fedb8c643d Fix fd leakage
Signed-off-by: Yuanyuan Chen <cyyever@outlook.com>
2025-11-14 11:59:50 +08:00
b7fc169164 Use context manager
Signed-off-by: Yuanyuan Chen <cyyever@outlook.com>
2025-11-14 11:10:19 +08:00
9 changed files with 46 additions and 51 deletions

View File

@ -391,13 +391,10 @@ def _save_fx_default(current_name, folder_name, dump_example_input, gm, example_
gm.to_folder(
f"{folder_name}/{current_name}/{current_name}_{type_name}_{graph_index}"
)
pickle.dump(
input_meta,
open(
f"{folder_name}/{current_name}/{current_name}_{type_name}_{graph_index}/{current_name}_{type_name}_{graph_index}.input", # noqa: B950
"wb",
),
) # noqa: E501
with open(
f"{folder_name}/{current_name}/{current_name}_{type_name}_{graph_index}/{current_name}_{type_name}_{graph_index}.input"
) as f:
pickle.dump(input_meta, f)
if dump_example_input:
torch.save(
args,

View File

@ -311,7 +311,7 @@ class _OnDiskCacheImpl(_CacheImpl):
r_fp, w_fp, inserted = None, None, False
try:
w_fp = open(fpath, "xb")
w_fp = open(fpath, "xb") # noqa: SIM115
except FileExistsError:
is_stale: bool = False
with open(fpath, "rb") as r_fp:
@ -322,7 +322,7 @@ class _OnDiskCacheImpl(_CacheImpl):
# match so we choose to remove the old entry so that the new
# k/v pair can be cached
fpath.unlink()
w_fp = open(fpath, "xb")
w_fp = open(fpath, "xb") # noqa: SIM115
else:
w_fp = None
finally:

View File

@ -76,7 +76,7 @@ if is_available():
def interaction(self, *args, **kwargs):
_stdin = sys.stdin
try:
sys.stdin = open("/dev/stdin")
sys.stdin = open("/dev/stdin") # noqa: SIM115
pdb.Pdb.interaction(self, *args, **kwargs)
finally:
sys.stdin = _stdin

View File

@ -281,23 +281,22 @@ class FileTimerServer:
# 2. We are running the watchdog loop in a separate daemon
# thread, which will not block the process to stop.
try:
fd = open(self._file_path)
with open(self._file_path) as fd:
self._is_client_started = True
while not self._stop_signaled:
try:
run_once = self._run_once
self._run_watchdog(fd)
if run_once:
break
self._last_progress_time = int(time.time())
except Exception:
logger.exception("Error running watchdog")
except Exception:
logger.exception("Could not open the FileTimerServer pipe")
raise
with fd:
self._is_client_started = True
while not self._stop_signaled:
try:
run_once = self._run_once
self._run_watchdog(fd)
if run_once:
break
self._last_progress_time = int(time.time())
except Exception:
logger.exception("Error running watchdog")
def _run_watchdog(self, fd: io.TextIOWrapper) -> None:
timer_requests = self._get_requests(fd, self._max_interval)
self.register_timers(timer_requests)

View File

@ -736,7 +736,7 @@ def download_url_to_file(
for _ in range(tempfile.TMP_MAX):
tmp_dst = dst + "." + uuid.uuid4().hex + ".partial"
try:
f = open(tmp_dst, "w+b")
f = open(tmp_dst, "w+b") # noqa: SIM115
except FileExistsError:
continue
break

View File

@ -746,7 +746,7 @@ class _opener(Generic[T]):
class _open_file(_opener[IO[bytes]]):
def __init__(self, name: Union[str, os.PathLike[str]], mode: str) -> None:
super().__init__(open(name, mode))
super().__init__(open(name, mode)) # noqa: SIM115
def __exit__(self, *args):
self.file_like.close()

View File

@ -43,6 +43,7 @@ from torch.testing._internal.common_utils import (
retry_on_connect_failures,
skip_but_pass_in_sandcastle,
skip_but_pass_in_sandcastle_if,
TemporaryFileName,
TEST_CUDA,
TEST_HPU,
TEST_WITH_ROCM,
@ -809,7 +810,8 @@ class MultiProcessTestCase(TestCase):
self.processes = [] # type: ignore[var-annotated]
self.rank = self.MAIN_PROCESS_RANK
self.file_name = tempfile.NamedTemporaryFile(delete=False).name
with TemporaryFileName() as name:
self.file_name = name
# pid to pipe consisting of error message from process.
self.pid_to_pipe = {} # type: ignore[var-annotated]
@ -1811,7 +1813,8 @@ class MultiProcContinuousTest(TestCase):
cls.task_queues = []
cls.completion_queues = []
# Need a rendezvous file for `init_process_group` purpose.
cls.rdvz_file = tempfile.NamedTemporaryFile(delete=False).name
with TemporaryFileName() as name:
cls.rdvz_file = name
# CUDA multiprocessing requires spawn instead of fork, to make sure
# child processes have their own memory space.

View File

@ -1427,7 +1427,7 @@ if IS_WINDOWS:
raise UserWarning("only TemporaryFileName with delete=False is supported on Windows.")
else:
kwargs['delete'] = False
f = tempfile.NamedTemporaryFile(*args, **kwargs)
f = tempfile.NamedTemporaryFile(*args, **kwargs) # noqa:SIM115
try:
f.close()
yield f.name

View File

@ -87,6 +87,7 @@ from torch.testing._internal.common_utils import (
skip_but_pass_in_sandcastle,
skip_but_pass_in_sandcastle_if,
skipIfRocm,
TemporaryFileName,
)
from torch.utils._python_dispatch import TorchDispatchMode
from torch.utils.data.distributed import DistributedSampler
@ -215,10 +216,7 @@ def get_profiling_event(event_name, profiler, dedup_gpu_user_annotation=False):
def get_profiler_nccl_meta(prof):
"""Torch profiler includes nccl metadata in an inserted operator called "record_param_comms"
We will need to test metadata obtained from profiler here"""
with tempfile.NamedTemporaryFile(mode="w+t", suffix=".json") as tf:
tf.close()
trace_file = tf.name
with TemporaryFileName(mode="w+t", suffix=".json") as trace_file:
prof.export_chrome_trace(trace_file)
with open(trace_file) as f:
events = json.load(f)["traceEvents"]
@ -7075,27 +7073,25 @@ class DistributedTest:
def test_ddp_profiling_execution_trace(self):
self.assertEqual(dist.get_backend(), "nccl")
# Create a temp file to save execution trace data
fp = tempfile.NamedTemporaryFile("w+t", suffix=".et.json", delete=False)
fp.close()
et_file = fp.name
et = ExecutionTraceObserver().register_callback(et_file)
with TemporaryFileName("w+t", suffix=".et.json") as et_file:
et = ExecutionTraceObserver().register_callback(et_file)
# first profiler context need not have ET
torch_profiler_ctx1 = torch.profiler.profile(
activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
)
# collect ET in second profiler pass
torch_profiler_ctx2 = torch.profiler.profile(
activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
execution_trace_observer=et,
)
self._test_ddp_profiling(
profiler_ctx=torch_profiler_ctx1,
profiler_ctx2=torch_profiler_ctx2,
)
# first profiler context need not have ET
torch_profiler_ctx1 = torch.profiler.profile(
activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
)
# collect ET in second profiler pass
torch_profiler_ctx2 = torch.profiler.profile(
activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
execution_trace_observer=et,
)
self._test_ddp_profiling(
profiler_ctx=torch_profiler_ctx1,
profiler_ctx2=torch_profiler_ctx2,
)
print(f"Execution trace saved at {fp.name}")
self._validate_execution_trace_nccl(et_file)
print(f"Execution trace saved at {et_file}")
self._validate_execution_trace_nccl(et_file)
@skip_if_lt_x_gpu(2)
@skip_but_pass_in_sandcastle_if(