Files
pytorch/test/inductor/test_compile_worker.py
clr 3144713325 subproc_pool: Add support for enabling quiesce via a timer (#166467)
This adds the capability to subproc pool to enable quiesce via a timer

Pull Request resolved: https://github.com/pytorch/pytorch/pull/166467
Approved by: https://github.com/masnesral
2025-11-04 17:37:41 +00:00

154 lines
4.2 KiB
Python

# Owner(s): ["module: inductor"]
import operator
import os
import tempfile
from threading import Event
import torch._inductor.config as config
from torch._inductor.compile_worker.subproc_pool import (
raise_testexc,
SubprocException,
SubprocPool,
)
from torch._inductor.compile_worker.timer import Timer
from torch._inductor.test_case import TestCase
from torch.testing._internal.common_utils import skipIfWindows
from torch.testing._internal.inductor_utils import HAS_CPU
class TestCompileWorker(TestCase):
def make_pool(self, size):
return SubprocPool(size)
@skipIfWindows(msg="pass_fds not supported on Windows.")
def test_basic_jobs(self):
pool = self.make_pool(2)
try:
a = pool.submit(operator.add, 100, 1)
b = pool.submit(operator.sub, 100, 1)
self.assertEqual(a.result(), 101)
self.assertEqual(b.result(), 99)
finally:
pool.shutdown()
@skipIfWindows(msg="pass_fds not supported on Windows.")
def test_exception(self):
pool = self.make_pool(2)
try:
a = pool.submit(raise_testexc)
with self.assertRaisesRegex(
SubprocException,
"torch._inductor.compile_worker.subproc_pool.TestException",
):
a.result()
finally:
pool.shutdown()
@skipIfWindows(msg="pass_fds not supported on Windows.")
def test_crash(self):
pool = self.make_pool(2)
try:
with self.assertRaises(Exception):
a = pool.submit(os._exit, 1)
a.result()
# Pool should still be usable after a crash
b = pool.submit(operator.add, 100, 1)
c = pool.submit(operator.sub, 100, 1)
self.assertEqual(b.result(), 101)
self.assertEqual(c.result(), 99)
finally:
pool.shutdown()
@skipIfWindows(msg="pass_fds not supported on Windows.")
def test_quiesce(self):
pool = self.make_pool(2)
try:
a = pool.submit(operator.add, 100, 1)
pool.quiesce()
pool.wakeup()
b = pool.submit(operator.sub, 100, 1)
self.assertEqual(a.result(), 101)
self.assertEqual(b.result(), 99)
finally:
pool.shutdown()
@skipIfWindows(msg="pass_fds not supported on Windows.")
def test_logging(self):
os.environ["MAST_HPC_JOB_NAME"] = "test_job"
os.environ["ROLE_RANK"] = "0"
with tempfile.NamedTemporaryFile(delete=True) as temp_log:
os.environ["TORCHINDUCTOR_WORKER_LOGPATH"] = temp_log.name
pool = self.make_pool(2)
try:
pool.submit(operator.add, 100, 1)
self.assertEqual(os.path.exists(temp_log.name), True)
finally:
pool.shutdown()
@config.patch("quiesce_async_compile_time", 0.1)
class TestCompileWorkerWithTimer(TestCompileWorker):
def make_pool(self, size):
return SubprocPool(size, quiesce=True)
class TestTimer(TestCase):
def test_basics(self):
done = Event()
def doit():
done.set()
t = Timer(0.1, doit)
t.sleep_time = 0.1
t.record_call()
self.assertTrue(done.wait(4))
t.quit()
def test_repeated_calls(self):
done = Event()
def doit():
done.set()
t = Timer(0.1, doit)
t.sleep_time = 0.1
for _ in range(10):
t.record_call()
self.assertTrue(done.wait(4))
done.clear()
t.quit()
def test_never_fires(self):
done = Event()
def doit():
done.set()
t = Timer(999, doit)
t.sleep_time = 0.1
t.record_call()
self.assertFalse(done.wait(4))
t.quit()
def test_spammy_calls(self):
done = Event()
def doit():
done.set()
t = Timer(1, doit)
t.sleep_time = 0.1
for _ in range(400):
t.record_call()
self.assertTrue(done.wait(4))
t.quit()
if __name__ == "__main__":
from torch._inductor.test_case import run_tests
if HAS_CPU:
run_tests()