mirror of
https://github.com/pytorch/pytorch.git
synced 2025-10-21 05:34:18 +08:00
Test Plan: Add test in test_profiler.py Pull Request resolved: https://github.com/pytorch/pytorch/pull/79993 Approved by: https://github.com/robieta
205 lines
7.4 KiB
Python
205 lines
7.4 KiB
Python
from collections import deque
|
|
from dataclasses import dataclass
|
|
from typing import Dict, List
|
|
|
|
from torch.profiler import DeviceType
|
|
from torch.autograd.profiler import profile
|
|
from torch.autograd import _KinetoEvent
|
|
|
|
|
|
@dataclass
|
|
class EventMetrics:
|
|
duration_time_ns: int = 0
|
|
self_time_ns: int = 0
|
|
idle_time_ns: int = 0
|
|
queue_depth: int = 0
|
|
|
|
@property
|
|
def fraction_idle_time(self):
|
|
if self.duration_time_ns == 0:
|
|
return 0.0
|
|
return self.idle_time_ns / self.duration_time_ns
|
|
|
|
|
|
@dataclass
|
|
class Interval:
|
|
start: int
|
|
end: int
|
|
queue_depth: int = 0
|
|
|
|
|
|
class EventKey:
|
|
|
|
def __init__(self, event):
|
|
self.event = event
|
|
|
|
def __hash__(self):
|
|
return hash(self.event.id)
|
|
|
|
def __eq__(self, other):
|
|
return self.event.id == other.event.id
|
|
|
|
def __repr__(self):
|
|
return f"<{self.event.name()} id={self.event.correlation_id}>"
|
|
|
|
def intervals_overlap(self, intervals: List[Interval]):
|
|
overlap_time = 0
|
|
intervals = sorted(intervals, key=lambda x: x.start)
|
|
for i, interval in enumerate(intervals):
|
|
if i + 1 < len(intervals):
|
|
assert interval.end <= intervals[
|
|
i + 1].start, "Intervals must be disjoint"
|
|
overlap_start = max(self.event.start_time_ns, interval.start)
|
|
overlap_end = min(self.event.end_time_ns, interval.end)
|
|
|
|
if overlap_start < overlap_end:
|
|
overlap_time += overlap_end - overlap_start
|
|
return overlap_time
|
|
|
|
|
|
class BasicEvaluation:
|
|
|
|
def __init__(self, prof: profile):
|
|
self.profile = prof
|
|
self.metrics: Dict[EventKey, EventMetrics] = {}
|
|
self.compute_self_time()
|
|
self.event_keys = sorted((e for e in self.metrics.keys()),
|
|
key=lambda x: x.event.start_time_ns)
|
|
self.events = [e.event for e in self.event_keys]
|
|
self.cuda_events: List[_KinetoEvent] = []
|
|
self.queue_depth_list = self.compute_queue_depth()
|
|
self.compute_idle_time()
|
|
|
|
def compute_self_time(self):
|
|
'''
|
|
Computes event's self time(total time - time in child ops).
|
|
'''
|
|
assert (self.profile.kineto_results is not None)
|
|
stack = deque(self.profile.kineto_results.experimental_event_tree())
|
|
|
|
# standard iterating dfs
|
|
while stack:
|
|
curr_event = stack.pop()
|
|
self_time = curr_event.duration_time_ns
|
|
for child_event in curr_event.children:
|
|
self_time -= child_event.duration_time_ns
|
|
stack.append(child_event)
|
|
|
|
assert EventKey(
|
|
curr_event
|
|
) not in self.metrics, f"Duplicate id: {curr_event.id}, {curr_event.name()}"
|
|
self.metrics[EventKey(curr_event)] = EventMetrics(
|
|
self_time_ns=self_time)
|
|
self.metrics[EventKey(
|
|
curr_event)].duration_time_ns = curr_event.duration_time_ns
|
|
|
|
def compute_queue_depth(self):
|
|
'''
|
|
Computes queue_depth at each event. This will calculate the queue depth data for
|
|
All the events in the tree.
|
|
This will return a list of Interval of queue depth data of cuda launch and kernels.
|
|
'''
|
|
assert (self.profile.kineto_results is not None)
|
|
cuda_event_list = self.profile.kineto_results.events()
|
|
|
|
def is_cuda_launch_kernel(e):
|
|
# TODO: find a better way to identify cudaLaunchKernel
|
|
return e.name() == "cudaLaunchKernel"
|
|
|
|
def is_cuda_kernel(e):
|
|
# TODO: find a better way to identify CUDA Kernel
|
|
return e.device_type() == DeviceType.CUDA and "mem" not in e.name(
|
|
).lower()
|
|
|
|
cuda_launch_events = sorted(
|
|
(e for e in cuda_event_list if is_cuda_launch_kernel(e)),
|
|
key=lambda x: x.start_us())
|
|
cuda_kernel_events = sorted(
|
|
(e for e in cuda_event_list if is_cuda_kernel(e)),
|
|
key=lambda x: x.start_us())
|
|
|
|
self.cuda_events = sorted(cuda_launch_events + cuda_kernel_events,
|
|
key=lambda x: x.start_us())
|
|
|
|
kernel_mapping: Dict[_KinetoEvent, int] = {}
|
|
last_mapped_kernel = 0
|
|
for cuda_launch_event in cuda_launch_events:
|
|
index = index_of_first_match(
|
|
cuda_kernel_events,
|
|
lambda x: x.linked_correlation_id(
|
|
) == cuda_launch_event.linked_correlation_id(),
|
|
start=last_mapped_kernel)
|
|
kernel_mapping[cuda_launch_event] = index
|
|
last_mapped_kernel = index if index is not None else last_mapped_kernel
|
|
|
|
current_kernel_index = 0
|
|
spawned_kernel_index = -1
|
|
|
|
all_events = cuda_launch_events + cuda_kernel_events + self.events
|
|
|
|
def new_old_event_comparator(event):
|
|
if hasattr(event, "start_us"):
|
|
return event.start_us() * 1000
|
|
if hasattr(event, "start_time_ns"):
|
|
return event.start_time_ns
|
|
raise Exception("Unknown Event Type")
|
|
|
|
queue_depth_list: List[Interval] = []
|
|
all_events.sort(key=new_old_event_comparator)
|
|
for event in all_events:
|
|
# Find latest cuda kernel event
|
|
if hasattr(event, "start_us"):
|
|
start_time = event.start_us() * 1000
|
|
end_time = (event.start_us() + event.duration_us()) * 1000
|
|
# Find current spawned cuda kernel event
|
|
if event in kernel_mapping and kernel_mapping[
|
|
event] is not None:
|
|
spawned_kernel_index = kernel_mapping[event]
|
|
elif hasattr(event, "start_time_ns"):
|
|
start_time = event.start_time_ns # type: ignore[attr-defined]
|
|
end_time = event.end_time_ns # type: ignore[attr-defined]
|
|
|
|
while (current_kernel_index < len(cuda_kernel_events) and
|
|
(cuda_kernel_events[current_kernel_index].start_us()) * 1000
|
|
<= start_time):
|
|
current_kernel_index += 1
|
|
current_queue_depth = spawned_kernel_index - current_kernel_index + 1
|
|
|
|
if hasattr(event, "start_us"):
|
|
queue_depth_list.append(
|
|
Interval(start_time, end_time, current_queue_depth))
|
|
elif hasattr(event, "start_time_ns"):
|
|
self.metrics[EventKey(event)].queue_depth = current_queue_depth
|
|
|
|
return queue_depth_list
|
|
|
|
def compute_idle_time(self):
|
|
'''
|
|
Computes idle time of the profile.
|
|
'''
|
|
# Based on queue_depth_list, we can calculate idle time for all the events
|
|
idle = False
|
|
idle_start = 0
|
|
idle_intervals: List[Interval] = []
|
|
for data_point in self.queue_depth_list:
|
|
if data_point.queue_depth == 0 and not idle:
|
|
idle_start = data_point.end
|
|
idle = True
|
|
if data_point.queue_depth > 0 and idle:
|
|
idle_intervals.append(Interval(idle_start, data_point.start))
|
|
idle = False
|
|
|
|
event_list = [e.event for e in self.metrics.keys()]
|
|
for event in event_list:
|
|
self.metrics[EventKey(event)].idle_time_ns = EventKey(
|
|
event).intervals_overlap(idle_intervals)
|
|
|
|
|
|
def index_of_first_match(seq, predicate, start=0, end=None):
|
|
if end is None or end >= len(seq):
|
|
end = len(seq)
|
|
for i in range(start, end):
|
|
if predicate(seq[i]):
|
|
return i
|
|
return None
|