mirror of
https://github.com/pytorch/pytorch.git
synced 2025-10-20 21:14:14 +08:00
This PR enables the profiler related UT to be device-agnostic. It instantiates the profiler UTs for different device types and enable them on XPU backend. Pull Request resolved: https://github.com/pytorch/pytorch/pull/134316 Approved by: https://github.com/etaf, https://github.com/aaronenyeshi, https://github.com/gujinghui
350 lines
12 KiB
Python
350 lines
12 KiB
Python
# Owner(s): ["oncall: profiler"]
|
|
|
|
import os
|
|
import unittest
|
|
from unittest import skipIf
|
|
|
|
import torch
|
|
import torch.utils.cpp_extension
|
|
from torch._environment import is_fbcode
|
|
from torch.testing._internal.common_utils import IS_WINDOWS, run_tests, TestCase
|
|
|
|
|
|
if is_fbcode():
|
|
import caffe2.test.profiler_test_cpp_thread_lib as cpp # @manual=//caffe2/test:profiler_test_cpp_thread_lib
|
|
else:
|
|
# cpp extensions use relative paths. Those paths are relative to
|
|
# this file, so we'll change the working directory temporarily
|
|
old_working_dir = os.getcwd()
|
|
os.chdir(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
cpp = torch.utils.cpp_extension.load(
|
|
name="profiler_test_cpp_thread_lib",
|
|
sources=[
|
|
"test_cpp_thread.cpp",
|
|
],
|
|
verbose=True,
|
|
)
|
|
|
|
# return the working directory (see setUp)
|
|
os.chdir(old_working_dir)
|
|
|
|
|
|
KinetoProfiler = None
|
|
IterationCount = 5
|
|
ActivateIteration = 2
|
|
device = None
|
|
|
|
|
|
def blueprint(text):
|
|
print(f"\33[34m{text}\33[0m")
|
|
|
|
|
|
# onIterationStart() will be called by C++ training engine in cpp_thread_test_lib.cpp
|
|
class PythonProfilerEventHandler(cpp.ProfilerEventHandler):
|
|
def onIterationStart(self, iteration: int) -> None:
|
|
global KinetoProfiler, IterationCount
|
|
# it is important to start the profiler on the same thread that step() is called
|
|
# and yes, onIterationStart() will always be called on the same thread
|
|
if iteration == 0:
|
|
# this also means step() starts on iteration 1, not 0
|
|
KinetoProfiler.start()
|
|
blueprint("starting kineto profiler")
|
|
elif iteration == IterationCount - 1:
|
|
KinetoProfiler.stop()
|
|
blueprint("stopping kineto profiler")
|
|
else:
|
|
blueprint("stepping kineto profiler")
|
|
KinetoProfiler.step()
|
|
|
|
def emulateTraining(self, iteration: int, thread_id: int) -> None:
|
|
global device
|
|
# blueprint(f"training iteration {iteration} in thread {thread_id}")
|
|
torch_device = getattr(torch, device)
|
|
assert hasattr(torch_device, "synchronize")
|
|
sync_func = torch_device.synchronize
|
|
|
|
with torch.autograd.profiler.record_function("user_function"):
|
|
a = torch.ones(1, device=device)
|
|
b = torch.ones(1, device=device)
|
|
torch.add(a, b).cpu()
|
|
sync_func()
|
|
|
|
|
|
class CppThreadTestCUDA(TestCase):
|
|
ThreadCount = 20 # set to 2 for debugging
|
|
EventHandler = None
|
|
TraceObject = None
|
|
|
|
@classmethod
|
|
def setUpClass(cls) -> None:
|
|
super(TestCase, cls).setUpClass()
|
|
CppThreadTestCUDA.EventHandler = PythonProfilerEventHandler()
|
|
cpp.ProfilerEventHandler.Register(CppThreadTestCUDA.EventHandler)
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
if not is_fbcode():
|
|
torch.testing._internal.common_utils.remove_cpp_extensions_build_root()
|
|
|
|
def setUp(self) -> None:
|
|
if not torch.cuda.is_available():
|
|
self.skipTest("Test machine does not have cuda")
|
|
global device
|
|
device = "cuda"
|
|
|
|
# this clears off events from initialization
|
|
self.start_profiler(False)
|
|
cpp.start_threads(1, IterationCount, False)
|
|
|
|
def start_profiler(self, profile_memory):
|
|
global KinetoProfiler
|
|
KinetoProfiler = torch.profiler.profile(
|
|
schedule=torch.profiler.schedule(
|
|
wait=1, warmup=1, active=ActivateIteration, repeat=1
|
|
),
|
|
on_trace_ready=self.set_trace,
|
|
with_stack=True,
|
|
profile_memory=profile_memory,
|
|
record_shapes=True,
|
|
)
|
|
|
|
def set_trace(self, trace_obj) -> None:
|
|
CppThreadTestCUDA.TraceObject = trace_obj
|
|
|
|
def assert_text(self, condition, text, msg):
|
|
if condition:
|
|
print(f"\33[32m{text}\33[0m")
|
|
else:
|
|
print(f"\33[31m{text}\33[0m")
|
|
self.assertTrue(condition, msg)
|
|
|
|
def check_trace(self, expected, mem=False) -> None:
|
|
blueprint("verifying trace")
|
|
event_list = CppThreadTestCUDA.TraceObject.events()
|
|
for key, values in expected.items():
|
|
count = values[0]
|
|
min_count = count * (ActivateIteration - 1)
|
|
device = values[1]
|
|
filtered = filter(
|
|
lambda ev: ev.name == key
|
|
and str(ev.device_type) == f"DeviceType.{device}",
|
|
event_list,
|
|
)
|
|
|
|
if mem:
|
|
actual = 0
|
|
for ev in filtered:
|
|
sev = str(ev)
|
|
has_cuda_memory_usage = (
|
|
sev.find("cuda_memory_usage=0 ") < 0
|
|
and sev.find("cuda_memory_usage=") > 0
|
|
)
|
|
if has_cuda_memory_usage:
|
|
actual += 1
|
|
self.assert_text(
|
|
actual >= min_count,
|
|
f"{key}: {actual} >= {min_count}",
|
|
"not enough event with cuda_memory_usage set",
|
|
)
|
|
else:
|
|
actual = len(list(filtered))
|
|
if count == 1: # test_without
|
|
count *= ActivateIteration
|
|
self.assert_text(
|
|
actual == count,
|
|
f"{key}: {actual} == {count}",
|
|
"baseline event count incorrect",
|
|
)
|
|
else:
|
|
self.assert_text(
|
|
actual >= min_count,
|
|
f"{key}: {actual} >= {min_count}",
|
|
"not enough event recorded",
|
|
)
|
|
|
|
@skipIf(
|
|
IS_WINDOWS,
|
|
"Failing on windows cuda, see https://github.com/pytorch/pytorch/pull/130037 for slightly more context",
|
|
)
|
|
def test_with_enable_profiler_in_child_thread_cuda(self) -> None:
|
|
self.start_profiler(False)
|
|
cpp.start_threads(self.ThreadCount, IterationCount, True)
|
|
self.check_trace(
|
|
{
|
|
"aten::add": [self.ThreadCount, "CPU"],
|
|
"user_function": [self.ThreadCount, "CUDA"],
|
|
}
|
|
)
|
|
|
|
@skipIf(
|
|
IS_WINDOWS,
|
|
"Failing on windows cuda, see https://github.com/pytorch/pytorch/pull/130037 for slightly more context",
|
|
)
|
|
def test_without_enable_profiler_in_child_thread_cuda(self) -> None:
|
|
self.start_profiler(False)
|
|
cpp.start_threads(self.ThreadCount, IterationCount, False)
|
|
self.check_trace(
|
|
{
|
|
"aten::add": [1, "CPU"],
|
|
"user_function": [1, "CUDA"],
|
|
}
|
|
)
|
|
|
|
@skipIf(
|
|
IS_WINDOWS,
|
|
"Failing on windows cuda, see https://github.com/pytorch/pytorch/pull/130037 for slightly more context",
|
|
)
|
|
def test_profile_memory_cuda(self) -> None:
|
|
self.start_profiler(True)
|
|
cpp.start_threads(self.ThreadCount, IterationCount, True)
|
|
self.check_trace(
|
|
{
|
|
"aten::add": [self.ThreadCount, "CPU"],
|
|
},
|
|
mem=True,
|
|
)
|
|
|
|
|
|
# Here duplicate the CppThreadTest to enable the xpu cases because the
|
|
# instantiate_device_type_tests will call class method setUpClass.
|
|
# In function setUpClass, the instantiated class(e.g CppThreadTestCPU, CppThreadTestXPU)
|
|
# needs to be called to get it member EventHandler, while in this period,
|
|
# the input class in argument cls is CppThreadTest, which is not defined any more.
|
|
# We cannot detect which instantiated class is being created in setUpClass, so duplicate here
|
|
# for enabling xpu test cases
|
|
class CppThreadTestXPU(TestCase):
|
|
ThreadCount = 20 # set to 2 for debugging
|
|
EventHandler = None
|
|
TraceObject = None
|
|
|
|
@classmethod
|
|
def setUpClass(cls) -> None:
|
|
super(TestCase, cls).setUpClass()
|
|
CppThreadTestXPU.EventHandler = PythonProfilerEventHandler()
|
|
cpp.ProfilerEventHandler.Register(CppThreadTestXPU.EventHandler)
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
if not is_fbcode():
|
|
torch.testing._internal.common_utils.remove_cpp_extensions_build_root()
|
|
|
|
def setUp(self) -> None:
|
|
if not torch.xpu.is_available():
|
|
self.skipTest("Test machine does not have xpu")
|
|
global device
|
|
device = "xpu"
|
|
|
|
# this clears off events from initialization
|
|
self.start_profiler(False)
|
|
cpp.start_threads(1, IterationCount, False)
|
|
|
|
def start_profiler(self, profile_memory):
|
|
global KinetoProfiler
|
|
KinetoProfiler = torch.profiler.profile(
|
|
schedule=torch.profiler.schedule(
|
|
wait=1, warmup=1, active=ActivateIteration, repeat=1
|
|
),
|
|
on_trace_ready=self.set_trace,
|
|
with_stack=True,
|
|
profile_memory=profile_memory,
|
|
record_shapes=True,
|
|
)
|
|
|
|
def set_trace(self, trace_obj) -> None:
|
|
CppThreadTestXPU.TraceObject = trace_obj
|
|
|
|
def assert_text(self, condition, text, msg):
|
|
if condition:
|
|
print(f"\33[32m{text}\33[0m")
|
|
else:
|
|
print(f"\33[31m{text}\33[0m")
|
|
self.assertTrue(condition, msg)
|
|
|
|
def check_trace(self, expected, mem=False) -> None:
|
|
blueprint("verifying trace")
|
|
event_list = CppThreadTestXPU.TraceObject.events()
|
|
for key, values in expected.items():
|
|
count = values[0]
|
|
min_count = count * (ActivateIteration - 1)
|
|
device = values[1]
|
|
filtered = filter(
|
|
lambda ev: ev.name == key
|
|
and str(ev.device_type) == f"DeviceType.{device}",
|
|
event_list,
|
|
)
|
|
|
|
if mem:
|
|
actual = 0
|
|
for ev in filtered:
|
|
sev = str(ev)
|
|
has_cuda_memory_usage = (
|
|
sev.find("xpu_memory_usage=0 ") < 0
|
|
and sev.find("xpu_memory_usage=") > 0
|
|
)
|
|
if has_cuda_memory_usage:
|
|
actual += 1
|
|
self.assert_text(
|
|
actual >= min_count,
|
|
f"{key}: {actual} >= {min_count}",
|
|
"not enough event with xpu_memory_usage set",
|
|
)
|
|
else:
|
|
actual = len(list(filtered))
|
|
if count == 1: # test_without
|
|
count *= ActivateIteration
|
|
self.assert_text(
|
|
actual == count,
|
|
f"{key}: {actual} == {count}",
|
|
"baseline event count incorrect",
|
|
)
|
|
else:
|
|
self.assert_text(
|
|
actual >= min_count,
|
|
f"{key}: {actual} >= {min_count}",
|
|
"not enough event recorded",
|
|
)
|
|
|
|
@unittest.skip(
|
|
reason="The XPU Profiler will not cover this case for now. Will support it in next period."
|
|
)
|
|
def test_with_enable_profiler_in_child_thread_xpu(self) -> None:
|
|
self.start_profiler(False)
|
|
cpp.start_threads(self.ThreadCount, IterationCount, True)
|
|
self.check_trace(
|
|
{
|
|
"aten::add": [self.ThreadCount, "CPU"],
|
|
"user_function": [self.ThreadCount, "XPU"],
|
|
}
|
|
)
|
|
|
|
@unittest.skip(
|
|
reason="The XPU Profiler will not cover this case for now. Will support it in next period."
|
|
)
|
|
def test_without_enable_profiler_in_child_thread_xpu(self) -> None:
|
|
self.start_profiler(False)
|
|
cpp.start_threads(self.ThreadCount, IterationCount, False)
|
|
self.check_trace(
|
|
{
|
|
"aten::add": [1, "CPU"],
|
|
"user_function": [1, "XPU"],
|
|
}
|
|
)
|
|
|
|
@unittest.skip(
|
|
reason="The XPU Profiler will not cover this case for now. Will support it in next period."
|
|
)
|
|
def test_profile_memory_xpu(self) -> None:
|
|
self.start_profiler(True)
|
|
cpp.start_threads(self.ThreadCount, IterationCount, True)
|
|
self.check_trace(
|
|
{
|
|
"aten::add": [self.ThreadCount, "CPU"],
|
|
},
|
|
mem=True,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run_tests()
|