[BE][Easy] enable postponed annotations in tools (#129375)

Pull Request resolved: https://github.com/pytorch/pytorch/pull/129375
Approved by: https://github.com/malfet
This commit is contained in:
Xuehai Pan
2024-06-29 12:48:06 +08:00
committed by PyTorch MergeBot
parent 58f346c874
commit 8a67daf283
123 changed files with 1274 additions and 1053 deletions

View File

@ -1,6 +1,8 @@
from __future__ import annotations
import argparse
import os
from typing import cast, List, Optional, Tuple
from typing import cast
from ..util.setting import (
CompilerType,
@ -38,7 +40,7 @@ BLOCKED_PYTHON_TESTS = {
}
def initialization() -> Tuple[Option, TestList, List[str]]:
def initialization() -> tuple[Option, TestList, list[str]]:
# create folder if not exists
create_folders()
# add arguments
@ -77,7 +79,7 @@ def add_arguments_oss(parser: argparse.ArgumentParser) -> argparse.ArgumentParse
def parse_arguments(
parser: argparse.ArgumentParser,
) -> Tuple[Option, Optional[List[str]], Optional[List[str]], Optional[bool]]:
) -> tuple[Option, list[str] | None, list[str] | None, bool | None]:
# parse args
args = parser.parse_args()
# get option
@ -85,9 +87,7 @@ def parse_arguments(
return (options, args.interest_only, args.run_only, args.clean)
def get_test_list_by_type(
run_only: Optional[List[str]], test_type: TestType
) -> TestList:
def get_test_list_by_type(run_only: list[str] | None, test_type: TestType) -> TestList:
test_list: TestList = []
binary_folder = get_oss_binary_folder(test_type)
g = os.walk(binary_folder)
@ -106,7 +106,7 @@ def get_test_list_by_type(
return test_list
def get_test_list(run_only: Optional[List[str]]) -> TestList:
def get_test_list(run_only: list[str] | None) -> TestList:
test_list: TestList = []
# add c++ test list
test_list.extend(get_test_list_by_type(run_only, TestType.CPP))
@ -122,7 +122,7 @@ def get_test_list(run_only: Optional[List[str]]) -> TestList:
return test_list
def empty_list_if_none(arg_interested_folder: Optional[List[str]]) -> List[str]:
def empty_list_if_none(arg_interested_folder: list[str] | None) -> list[str]:
if arg_interested_folder is None:
return []
# if this argument is specified, just return itself
@ -134,7 +134,7 @@ def gcc_export_init() -> None:
create_folder(JSON_FOLDER_BASE_DIR)
def get_python_run_only(args_run_only: Optional[List[str]]) -> List[str]:
def get_python_run_only(args_run_only: list[str] | None) -> list[str]:
# if user specifies run-only option
if args_run_only:
return args_run_only
@ -144,7 +144,7 @@ def get_python_run_only(args_run_only: Optional[List[str]]) -> List[str]:
return ["run_test.py"]
else:
# for clang, some tests will result in too large intermediate files that can't be merged by llvm, we need to skip them
run_only: List[str] = []
run_only: list[str] = []
binary_folder = get_oss_binary_folder(TestType.PY)
g = os.walk(binary_folder)
for _, _, file_list in g:

View File

@ -1,6 +1,7 @@
from __future__ import annotations
import os
import subprocess
from typing import List, Optional
from ..util.setting import CompilerType, TestType, TOOLS_FOLDER
from ..util.utils import print_error, remove_file
@ -14,7 +15,7 @@ def get_oss_binary_folder(test_type: TestType) -> str:
)
def get_oss_shared_library() -> List[str]:
def get_oss_shared_library() -> list[str]:
lib_dir = os.path.join(get_pytorch_folder(), "build", "lib")
return [
os.path.join(lib_dir, lib)
@ -48,7 +49,7 @@ def get_pytorch_folder() -> str:
)
def detect_compiler_type() -> Optional[CompilerType]:
def detect_compiler_type() -> CompilerType | None:
# check if user specifies the compiler type
user_specify = os.environ.get("CXX", None)
if user_specify:
@ -76,7 +77,7 @@ def clean_up_gcda() -> None:
remove_file(item)
def get_gcda_files() -> List[str]:
def get_gcda_files() -> list[str]:
folder_has_gcda = os.path.join(get_pytorch_folder(), "build")
if os.path.isdir(folder_has_gcda):
# TODO use glob

View File

@ -1,7 +1,8 @@
from __future__ import annotations
import os
import subprocess
import time
from typing import List
from ..util.setting import (
JSON_FOLDER_BASE_DIR,
@ -25,7 +26,7 @@ from .utils import get_tool_path_by_platform, run_cpp_test
def create_corresponding_folder(
cur_path: str, prefix_cur_path: str, dir_list: List[str], new_base_folder: str
cur_path: str, prefix_cur_path: str, dir_list: list[str], new_base_folder: str
) -> None:
for dir_name in dir_list:
relative_path = convert_to_relative_path(
@ -70,7 +71,7 @@ def export_target(
merged_file: str,
json_file: str,
binary_file: str,
shared_library_list: List[str],
shared_library_list: list[str],
platform_type: TestPlatform,
) -> None:
if binary_file is None:

View File

@ -1,7 +1,8 @@
from __future__ import annotations
import os
import subprocess
import time
from typing import Dict
# gcc is only used in oss
from ..oss.utils import get_gcda_files, run_oss_python_test
@ -10,7 +11,7 @@ from ..util.utils import print_log, print_time
from .utils import run_cpp_test
def update_gzip_dict(gzip_dict: Dict[str, int], file_name: str) -> str:
def update_gzip_dict(gzip_dict: dict[str, int], file_name: str) -> str:
file_name = file_name.lower()
gzip_dict[file_name] = gzip_dict.get(file_name, 0) + 1
num = gzip_dict[file_name]
@ -34,7 +35,7 @@ def export() -> None:
# collect .gcda files
gcda_files = get_gcda_files()
# file name like utils.cpp may have same name in different folder
gzip_dict: Dict[str, int] = {}
gzip_dict: dict[str, int] = {}
for gcda_item in gcda_files:
# generate json.gz
subprocess.check_call(["gcov", "-i", gcda_item])

View File

@ -1,12 +1,14 @@
import typing as t
from __future__ import annotations
from typing import Any, NamedTuple
class CoverageRecord(t.NamedTuple):
class CoverageRecord(NamedTuple):
filepath: str
covered_lines: t.List[int]
uncovered_lines: t.Optional[t.List[int]] = None
covered_lines: list[int]
uncovered_lines: list[int] | None = None
def to_dict(self) -> t.Dict[str, t.Any]:
def to_dict(self) -> dict[str, Any]:
return {
"filepath": self.filepath,
"covered_lines": self.covered_lines,

View File

@ -1,4 +1,6 @@
from typing import Any, Dict, List, Set
from __future__ import annotations
from typing import Any
from .coverage_record import CoverageRecord
@ -10,7 +12,7 @@ class GcovCoverageParser:
of CoverageRecord(s).
"""
def __init__(self, llvm_coverage: Dict[str, Any]) -> None:
def __init__(self, llvm_coverage: dict[str, Any]) -> None:
self._llvm_coverage = llvm_coverage
@staticmethod
@ -24,17 +26,17 @@ class GcovCoverageParser:
return True
return False
def parse(self) -> List[CoverageRecord]:
def parse(self) -> list[CoverageRecord]:
# The JSON format is described in the gcov source code
# https://gcc.gnu.org/onlinedocs/gcc/Invoking-Gcov.html
records: List[CoverageRecord] = []
records: list[CoverageRecord] = []
for file_info in self._llvm_coverage["files"]:
filepath = file_info["file"]
if self._skip_coverage(filepath):
continue
# parse json file
covered_lines: Set[int] = set()
uncovered_lines: Set[int] = set()
covered_lines: set[int] = set()
uncovered_lines: set[int] = set()
for line in file_info["lines"]:
line_number = line["line_number"]
count = line["count"]

View File

@ -1,4 +1,6 @@
from typing import Any, Dict, List, Set, Tuple
from __future__ import annotations
from typing import Any
from .coverage_record import CoverageRecord
from .llvm_coverage_segment import LlvmCoverageSegment, parse_segments
@ -12,7 +14,7 @@ class LlvmCoverageParser:
"""
def __init__(self, llvm_coverage: Dict[str, Any]) -> None:
def __init__(self, llvm_coverage: dict[str, Any]) -> None:
self._llvm_coverage = llvm_coverage
@staticmethod
@ -28,13 +30,13 @@ class LlvmCoverageParser:
@staticmethod
def _collect_coverage(
segments: List[LlvmCoverageSegment],
) -> Tuple[List[int], List[int]]:
segments: list[LlvmCoverageSegment],
) -> tuple[list[int], list[int]]:
"""
Stateful parsing of coverage segments.
"""
covered_lines: Set[int] = set()
uncovered_lines: Set[int] = set()
covered_lines: set[int] = set()
uncovered_lines: set[int] = set()
prev_segment = LlvmCoverageSegment(1, 0, 0, 0, 0, None)
for segment in segments:
covered_range, uncovered_range = segment.get_coverage(prev_segment)
@ -45,10 +47,10 @@ class LlvmCoverageParser:
uncovered_lines.difference_update(covered_lines)
return sorted(covered_lines), sorted(uncovered_lines)
def parse(self, repo_name: str) -> List[CoverageRecord]:
def parse(self, repo_name: str) -> list[CoverageRecord]:
# The JSON format is described in the LLVM source code
# https://github.com/llvm-mirror/llvm/blob/master/tools/llvm-cov/CoverageExporterJson.cpp
records: List[CoverageRecord] = []
records: list[CoverageRecord] = []
for export_unit in self._llvm_coverage["data"]:
for file_info in export_unit["files"]:
filepath = file_info["filename"]

View File

@ -1,4 +1,6 @@
from typing import List, NamedTuple, Optional, Tuple
from __future__ import annotations
from typing import NamedTuple
class LlvmCoverageSegment(NamedTuple):
@ -7,7 +9,7 @@ class LlvmCoverageSegment(NamedTuple):
segment_count: int
has_count: int
is_region_entry: int
is_gap_entry: Optional[int]
is_gap_entry: int | None
@property
def has_coverage(self) -> bool:
@ -18,8 +20,8 @@ class LlvmCoverageSegment(NamedTuple):
return self.has_count > 0
def get_coverage(
self, prev_segment: "LlvmCoverageSegment"
) -> Tuple[List[int], List[int]]:
self, prev_segment: LlvmCoverageSegment
) -> tuple[list[int], list[int]]:
# Code adapted from testpilot.testinfra.runners.gtestcoveragerunner.py
if not prev_segment.is_executable:
return [], []
@ -32,12 +34,12 @@ class LlvmCoverageSegment(NamedTuple):
return (lines_range, []) if prev_segment.has_coverage else ([], lines_range)
def parse_segments(raw_segments: List[List[int]]) -> List[LlvmCoverageSegment]:
def parse_segments(raw_segments: list[list[int]]) -> list[LlvmCoverageSegment]:
"""
Creates LlvmCoverageSegment from a list of lists in llvm export json.
each segment is represented by 5-element array.
"""
ret: List[LlvmCoverageSegment] = []
ret: list[LlvmCoverageSegment] = []
for raw_segment in raw_segments:
assert (
len(raw_segment) == 5 or len(raw_segment) == 6

View File

@ -1,10 +1,13 @@
from __future__ import annotations
import os
import subprocess
from typing import Dict, IO, List, Set, Tuple
from typing import IO, Tuple
from ..oss.utils import get_pytorch_folder
from ..util.setting import SUMMARY_FOLDER_DIR, TestList, TestStatusType
CoverageItem = Tuple[str, float, int, int]
@ -16,7 +19,7 @@ def key_by_name(x: CoverageItem) -> str:
return x[0]
def is_intrested_file(file_path: str, interested_folders: List[str]) -> bool:
def is_intrested_file(file_path: str, interested_folders: list[str]) -> bool:
if "cuda" in file_path:
return False
if "aten/gen_aten" in file_path or "aten/aten_" in file_path:
@ -27,7 +30,7 @@ def is_intrested_file(file_path: str, interested_folders: List[str]) -> bool:
return False
def is_this_type_of_tests(target_name: str, test_set_by_type: Set[str]) -> bool:
def is_this_type_of_tests(target_name: str, test_set_by_type: set[str]) -> bool:
# tests are divided into three types: success / partial success / fail to collect coverage
for test in test_set_by_type:
if target_name in test:
@ -36,7 +39,7 @@ def is_this_type_of_tests(target_name: str, test_set_by_type: Set[str]) -> bool:
def print_test_by_type(
tests: TestList, test_set_by_type: Set[str], type_name: str, summary_file: IO[str]
tests: TestList, test_set_by_type: set[str], type_name: str, summary_file: IO[str]
) -> None:
print("Tests " + type_name + " to collect coverage:", file=summary_file)
for test in tests:
@ -48,8 +51,8 @@ def print_test_by_type(
def print_test_condition(
tests: TestList,
tests_type: TestStatusType,
interested_folders: List[str],
coverage_only: List[str],
interested_folders: list[str],
coverage_only: list[str],
summary_file: IO[str],
summary_type: str,
) -> None:
@ -77,10 +80,10 @@ def print_test_condition(
def line_oriented_report(
tests: TestList,
tests_type: TestStatusType,
interested_folders: List[str],
coverage_only: List[str],
covered_lines: Dict[str, Set[int]],
uncovered_lines: Dict[str, Set[int]],
interested_folders: list[str],
coverage_only: list[str],
covered_lines: dict[str, set[int]],
uncovered_lines: dict[str, set[int]],
) -> None:
with open(os.path.join(SUMMARY_FOLDER_DIR, "line_summary"), "w+") as report_file:
print_test_condition(
@ -119,13 +122,13 @@ def print_file_summary(
def print_file_oriented_report(
tests_type: TestStatusType,
coverage: List[CoverageItem],
coverage: list[CoverageItem],
covered_summary: int,
total_summary: int,
summary_file: IO[str],
tests: TestList,
interested_folders: List[str],
coverage_only: List[str],
interested_folders: list[str],
coverage_only: list[str],
) -> None:
coverage_percentage = print_file_summary(
covered_summary, total_summary, summary_file
@ -155,10 +158,10 @@ def print_file_oriented_report(
def file_oriented_report(
tests: TestList,
tests_type: TestStatusType,
interested_folders: List[str],
coverage_only: List[str],
covered_lines: Dict[str, Set[int]],
uncovered_lines: Dict[str, Set[int]],
interested_folders: list[str],
coverage_only: list[str],
covered_lines: dict[str, set[int]],
uncovered_lines: dict[str, set[int]],
) -> None:
with open(os.path.join(SUMMARY_FOLDER_DIR, "file_summary"), "w+") as summary_file:
covered_summary = 0
@ -193,7 +196,7 @@ def file_oriented_report(
)
def get_html_ignored_pattern() -> List[str]:
def get_html_ignored_pattern() -> list[str]:
return ["/usr/*", "*anaconda3/*", "*third_party/*"]

View File

@ -1,7 +1,9 @@
from __future__ import annotations
import json
import os
import time
from typing import Any, Dict, List, Set, Tuple
from typing import Any, TYPE_CHECKING
from ..util.setting import (
CompilerType,
@ -16,7 +18,6 @@ from ..util.utils import (
print_time,
related_to_test_list,
)
from .parser.coverage_record import CoverageRecord
from .parser.gcov_coverage_parser import GcovCoverageParser
from .parser.llvm_coverage_parser import LlvmCoverageParser
from .print_report import (
@ -26,16 +27,20 @@ from .print_report import (
)
if TYPE_CHECKING:
from .parser.coverage_record import CoverageRecord
# coverage_records: Dict[str, LineInfo] = {}
covered_lines: Dict[str, Set[int]] = {}
uncovered_lines: Dict[str, Set[int]] = {}
covered_lines: dict[str, set[int]] = {}
uncovered_lines: dict[str, set[int]] = {}
tests_type: TestStatusType = {"success": set(), "partial": set(), "fail": set()}
def transform_file_name(
file_path: str, interested_folders: List[str], platform: TestPlatform
file_path: str, interested_folders: list[str], platform: TestPlatform
) -> str:
remove_patterns: Set[str] = {".DEFAULT.cpp", ".AVX.cpp", ".AVX2.cpp"}
remove_patterns: set[str] = {".DEFAULT.cpp", ".AVX.cpp", ".AVX2.cpp"}
for pattern in remove_patterns:
file_path = file_path.replace(pattern, "")
# if user has specified interested folder
@ -54,7 +59,7 @@ def transform_file_name(
def is_intrested_file(
file_path: str, interested_folders: List[str], platform: TestPlatform
file_path: str, interested_folders: list[str], platform: TestPlatform
) -> bool:
ignored_patterns = ["cuda", "aten/gen_aten", "aten/aten_", "build/"]
if any(pattern in file_path for pattern in ignored_patterns):
@ -77,7 +82,7 @@ def is_intrested_file(
return True
def get_json_obj(json_file: str) -> Tuple[Any, int]:
def get_json_obj(json_file: str) -> tuple[Any, int]:
"""
Sometimes at the start of file llvm/gcov will complains "fail to find coverage data",
then we need to skip these lines
@ -102,7 +107,7 @@ def get_json_obj(json_file: str) -> Tuple[Any, int]:
return None, 2
def parse_json(json_file: str, platform: TestPlatform) -> List[CoverageRecord]:
def parse_json(json_file: str, platform: TestPlatform) -> list[CoverageRecord]:
print("start parse:", json_file)
json_obj, read_status = get_json_obj(json_file)
if read_status == 0:
@ -117,7 +122,7 @@ def parse_json(json_file: str, platform: TestPlatform) -> List[CoverageRecord]:
cov_type = detect_compiler_type(platform)
coverage_records: List[CoverageRecord] = []
coverage_records: list[CoverageRecord] = []
if cov_type == CompilerType.CLANG:
coverage_records = LlvmCoverageParser(json_obj).parse("fbcode")
# print(coverage_records)
@ -128,7 +133,7 @@ def parse_json(json_file: str, platform: TestPlatform) -> List[CoverageRecord]:
def parse_jsons(
test_list: TestList, interested_folders: List[str], platform: TestPlatform
test_list: TestList, interested_folders: list[str], platform: TestPlatform
) -> None:
g = os.walk(JSON_FOLDER_BASE_DIR)
@ -152,8 +157,8 @@ def parse_jsons(
def update_coverage(
coverage_records: List[CoverageRecord],
interested_folders: List[str],
coverage_records: list[CoverageRecord],
interested_folders: list[str],
platform: TestPlatform,
) -> None:
for item in coverage_records:
@ -187,8 +192,8 @@ def update_set() -> None:
def summarize_jsons(
test_list: TestList,
interested_folders: List[str],
coverage_only: List[str],
interested_folders: list[str],
coverage_only: list[str],
platform: TestPlatform,
) -> None:
start_time = time.time()

View File

@ -1,3 +1,5 @@
from __future__ import annotations
import os
from enum import Enum
from typing import Dict, List, Set

View File

@ -1,8 +1,10 @@
from __future__ import annotations
import os
import shutil
import sys
import time
from typing import Any, NoReturn, Optional
from typing import Any, NoReturn
from .setting import (
CompilerType,
@ -113,7 +115,7 @@ def get_test_name_from_whole_path(path: str) -> str:
return path[start + 1 : end]
def check_compiler_type(cov_type: Optional[CompilerType]) -> None:
def check_compiler_type(cov_type: CompilerType | None) -> None:
if cov_type is not None and cov_type in [CompilerType.GCC, CompilerType.CLANG]:
return
raise Exception( # noqa: TRY002