mirror of
https://github.com/pytorch/pytorch.git
synced 2025-10-20 21:14:14 +08:00
Revert "[pytorch] Moving torch.compile worker process logs to a dedicated rank based log directory (#159874)"
This reverts commit 9fd5b5f73589cf08dca60910368cc0f05c7906c8. Reverted https://github.com/pytorch/pytorch/pull/159874 on behalf of https://github.com/malfet due to Broke lint ([comment](https://github.com/pytorch/pytorch/pull/159874#issuecomment-3161896978))
This commit is contained in:
@ -1,8 +1,6 @@
|
||||
# Owner(s): ["module: inductor"]
|
||||
import importlib
|
||||
import operator
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
from torch._inductor.compile_worker.subproc_pool import (
|
||||
raise_testexc,
|
||||
@ -13,6 +11,7 @@ from torch._inductor.test_case import TestCase
|
||||
from torch.testing._internal.common_utils import skipIfWindows
|
||||
from torch.testing._internal.inductor_utils import HAS_CPU
|
||||
|
||||
|
||||
class TestCompileWorker(TestCase):
|
||||
@skipIfWindows(msg="pass_fds not supported on Windows.")
|
||||
def test_basic_jobs(self):
|
||||
@ -67,18 +66,6 @@ class TestCompileWorker(TestCase):
|
||||
finally:
|
||||
pool.shutdown()
|
||||
|
||||
@skipIfWindows(msg="pass_fds not supported on Windows.")
|
||||
def test_logging(self):
|
||||
os.environ["MAST_HPC_JOB_NAME"] = "test_job"
|
||||
os.environ["ROLE_RANK"] = "0"
|
||||
with tempfile.NamedTemporaryFile(delete=True) as temp_log:
|
||||
os.environ["TORCHINDUCTOR_WORKER_LOGPATH"] = temp_log.name
|
||||
pool = SubprocPool(2)
|
||||
try:
|
||||
pool.submit(operator.add, 100, 1)
|
||||
self.assertEqual(os.path.exists(temp_log.name), True)
|
||||
finally:
|
||||
pool.shutdown()
|
||||
|
||||
if __name__ == "__main__":
|
||||
from torch._inductor.test_case import run_tests
|
||||
|
@ -145,24 +145,11 @@ class SubprocPool:
|
||||
f"--write-fd={str(subproc_write_fd)}",
|
||||
f"--torch-key={torch_key_str}",
|
||||
]
|
||||
mast_job_id = os.environ.get("MAST_HPC_JOB_NAME", None)
|
||||
global_rank = os.environ.get("ROLE_RANK", "0")
|
||||
worker_log_path = os.environ.get("TORCHINDUCTOR_WORKER_LOGPATH", config.worker_log_path)
|
||||
stdout_pipe = None
|
||||
stderr_pipe = None
|
||||
self.log_file = None
|
||||
|
||||
if mast_job_id is not None:
|
||||
log_loc = f"{worker_log_path}{global_rank}"
|
||||
self.log_file = open(log_loc, "w")
|
||||
elif config.worker_suppress_logging:
|
||||
local = False
|
||||
if config.worker_suppress_logging:
|
||||
log.info("Suppressing compile worker output due to config")
|
||||
self.log_file = open(os.devnull, "w")
|
||||
local = True
|
||||
|
||||
if self.log_file:
|
||||
stdout_pipe = self.log_file
|
||||
stderr_pipe = self.log_file
|
||||
|
||||
self.process = subprocess.Popen(
|
||||
cmd,
|
||||
env={
|
||||
@ -177,10 +164,9 @@ class SubprocPool:
|
||||
"LD_LIBRARY_PATH": get_ld_library_path(),
|
||||
},
|
||||
pass_fds=(subproc_read_fd, subproc_write_fd),
|
||||
stdout=stdout_pipe,
|
||||
stderr=stderr_pipe,
|
||||
stdout=subprocess.DEVNULL if local else None,
|
||||
stderr=subprocess.DEVNULL if local else None,
|
||||
)
|
||||
|
||||
self.write_lock = threading.Lock()
|
||||
self.read_thread = threading.Thread(
|
||||
target=self._read_thread, name="InductorSubproc", daemon=True
|
||||
@ -276,8 +262,6 @@ class SubprocPool:
|
||||
_send_msg(self.write_pipe, MsgHeader.SHUTDOWN)
|
||||
self.write_pipe.close()
|
||||
self.process.wait(300)
|
||||
if self.log_file:
|
||||
self.log_file.close()
|
||||
except OSError as e:
|
||||
log.warning("Ignored OSError in pool shutdown: %s", e)
|
||||
finally:
|
||||
|
@ -81,9 +81,6 @@ disable_progress = True
|
||||
# Whether to enable printing the source code for each future
|
||||
verbose_progress = False
|
||||
|
||||
# Configurable compile worker logging path for subproc_pool
|
||||
worker_log_path = "/logs/dedicated_log_torch_compile_worker_rank" if is_fbcode() else None
|
||||
|
||||
# precompilation timeout
|
||||
precompilation_timeout_seconds: int = 60 * 60
|
||||
|
||||
|
Reference in New Issue
Block a user