mirror of
https://github.com/pytorch/pytorch.git
synced 2025-10-20 21:14:14 +08:00
Summary: Writing torch.compile worked logs to dedicated_log_rank{RANK} if we're running on mast. ref: D79456310 (got reverted because of linter) Testing: Refer differential Revision: D79917440 Pull Request resolved: https://github.com/pytorch/pytorch/pull/160352 Approved by: https://github.com/masnesral
89 lines
2.7 KiB
Python
89 lines
2.7 KiB
Python
# Owner(s): ["module: inductor"]
|
|
import operator
|
|
import os
|
|
import tempfile
|
|
|
|
from torch._inductor.compile_worker.subproc_pool import (
|
|
raise_testexc,
|
|
SubprocException,
|
|
SubprocPool,
|
|
)
|
|
from torch._inductor.test_case import TestCase
|
|
from torch.testing._internal.common_utils import skipIfWindows
|
|
from torch.testing._internal.inductor_utils import HAS_CPU
|
|
|
|
|
|
class TestCompileWorker(TestCase):
|
|
@skipIfWindows(msg="pass_fds not supported on Windows.")
|
|
def test_basic_jobs(self):
|
|
pool = SubprocPool(2)
|
|
try:
|
|
a = pool.submit(operator.add, 100, 1)
|
|
b = pool.submit(operator.sub, 100, 1)
|
|
self.assertEqual(a.result(), 101)
|
|
self.assertEqual(b.result(), 99)
|
|
finally:
|
|
pool.shutdown()
|
|
|
|
@skipIfWindows(msg="pass_fds not supported on Windows.")
|
|
def test_exception(self):
|
|
pool = SubprocPool(2)
|
|
try:
|
|
a = pool.submit(raise_testexc)
|
|
with self.assertRaisesRegex(
|
|
SubprocException,
|
|
"torch._inductor.compile_worker.subproc_pool.TestException",
|
|
):
|
|
a.result()
|
|
finally:
|
|
pool.shutdown()
|
|
|
|
@skipIfWindows(msg="pass_fds not supported on Windows.")
|
|
def test_crash(self):
|
|
pool = SubprocPool(2)
|
|
try:
|
|
with self.assertRaises(Exception):
|
|
a = pool.submit(os._exit, 1)
|
|
a.result()
|
|
|
|
# Pool should still be usable after a crash
|
|
b = pool.submit(operator.add, 100, 1)
|
|
c = pool.submit(operator.sub, 100, 1)
|
|
self.assertEqual(b.result(), 101)
|
|
self.assertEqual(c.result(), 99)
|
|
finally:
|
|
pool.shutdown()
|
|
|
|
@skipIfWindows(msg="pass_fds not supported on Windows.")
|
|
def test_quiesce(self):
|
|
pool = SubprocPool(2)
|
|
try:
|
|
a = pool.submit(operator.add, 100, 1)
|
|
pool.quiesce()
|
|
pool.wakeup()
|
|
b = pool.submit(operator.sub, 100, 1)
|
|
self.assertEqual(a.result(), 101)
|
|
self.assertEqual(b.result(), 99)
|
|
finally:
|
|
pool.shutdown()
|
|
|
|
@skipIfWindows(msg="pass_fds not supported on Windows.")
|
|
def test_logging(self):
|
|
os.environ["MAST_HPC_JOB_NAME"] = "test_job"
|
|
os.environ["ROLE_RANK"] = "0"
|
|
with tempfile.NamedTemporaryFile(delete=True) as temp_log:
|
|
os.environ["TORCHINDUCTOR_WORKER_LOGPATH"] = temp_log.name
|
|
pool = SubprocPool(2)
|
|
try:
|
|
pool.submit(operator.add, 100, 1)
|
|
self.assertEqual(os.path.exists(temp_log.name), True)
|
|
finally:
|
|
pool.shutdown()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
from torch._inductor.test_case import run_tests
|
|
|
|
if HAS_CPU:
|
|
run_tests()
|