mirror of
https://github.com/pytorch/pytorch.git
synced 2025-10-20 21:14:14 +08:00
[redone][pytorch] Moving torch.compile worker process logs to a dedicated rank based log directory (#160352)
Summary: Writing torch.compile worked logs to dedicated_log_rank{RANK} if we're running on mast. ref: D79456310 (got reverted because of linter) Testing: Refer differential Revision: D79917440 Pull Request resolved: https://github.com/pytorch/pytorch/pull/160352 Approved by: https://github.com/masnesral
This commit is contained in:
committed by
PyTorch MergeBot
parent
a7abf57aab
commit
94b91a8763
@ -1,6 +1,7 @@
|
||||
# Owner(s): ["module: inductor"]
|
||||
import operator
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
from torch._inductor.compile_worker.subproc_pool import (
|
||||
raise_testexc,
|
||||
@ -66,6 +67,19 @@ class TestCompileWorker(TestCase):
|
||||
finally:
|
||||
pool.shutdown()
|
||||
|
||||
@skipIfWindows(msg="pass_fds not supported on Windows.")
|
||||
def test_logging(self):
|
||||
os.environ["MAST_HPC_JOB_NAME"] = "test_job"
|
||||
os.environ["ROLE_RANK"] = "0"
|
||||
with tempfile.NamedTemporaryFile(delete=True) as temp_log:
|
||||
os.environ["TORCHINDUCTOR_WORKER_LOGPATH"] = temp_log.name
|
||||
pool = SubprocPool(2)
|
||||
try:
|
||||
pool.submit(operator.add, 100, 1)
|
||||
self.assertEqual(os.path.exists(temp_log.name), True)
|
||||
finally:
|
||||
pool.shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from torch._inductor.test_case import run_tests
|
||||
|
@ -145,10 +145,19 @@ class SubprocPool:
|
||||
f"--write-fd={str(subproc_write_fd)}",
|
||||
f"--torch-key={torch_key_str}",
|
||||
]
|
||||
local = False
|
||||
log_path = None
|
||||
self.log_file = None
|
||||
|
||||
if config.worker_suppress_logging:
|
||||
log_path = os.devnull
|
||||
log.info("Suppressing compile worker output due to config")
|
||||
local = True
|
||||
else:
|
||||
log_path = config.torchinductor_worker_logpath
|
||||
if not log_path:
|
||||
log_path = config.get_worker_log_path()
|
||||
|
||||
if log_path:
|
||||
self.log_file = open(log_path, "w")
|
||||
|
||||
self.process = subprocess.Popen(
|
||||
cmd,
|
||||
@ -164,8 +173,8 @@ class SubprocPool:
|
||||
"LD_LIBRARY_PATH": get_ld_library_path(),
|
||||
},
|
||||
pass_fds=(subproc_read_fd, subproc_write_fd),
|
||||
stdout=subprocess.DEVNULL if local else None,
|
||||
stderr=subprocess.DEVNULL if local else None,
|
||||
stdout=self.log_file,
|
||||
stderr=self.log_file,
|
||||
)
|
||||
self.write_lock = threading.Lock()
|
||||
self.read_thread = threading.Thread(
|
||||
@ -262,6 +271,8 @@ class SubprocPool:
|
||||
_send_msg(self.write_pipe, MsgHeader.SHUTDOWN)
|
||||
self.write_pipe.close()
|
||||
self.process.wait(300)
|
||||
if self.log_file:
|
||||
self.log_file.close()
|
||||
except OSError as e:
|
||||
log.warning("Ignored OSError in pool shutdown: %s", e)
|
||||
finally:
|
||||
|
@ -1020,6 +1020,24 @@ enable_caching_generated_triton_templates: bool = True
|
||||
autotune_lookup_table: dict[str, dict[str, Any]] = {}
|
||||
|
||||
|
||||
def get_worker_log_path() -> Optional[str]:
|
||||
log_loc = None
|
||||
if is_fbcode():
|
||||
mast_job_name = os.environ.get("MAST_HPC_JOB_NAME", None)
|
||||
global_rank = os.environ.get("ROLE_RANK", "0")
|
||||
|
||||
if mast_job_name is not None:
|
||||
log_loc = f"/logs/dedicated_log_torch_compile_worker_rank{global_rank}"
|
||||
|
||||
return log_loc
|
||||
|
||||
|
||||
torchinductor_worker_logpath: str = Config(
|
||||
env_name_force="TORCHINDUCTOR_WORKER_LOGPATH",
|
||||
default="",
|
||||
)
|
||||
|
||||
|
||||
# config specific to codegen/cpp.py
|
||||
class cpp:
|
||||
"""
|
||||
|
Reference in New Issue
Block a user