[lint] add basic lintrunner compatibility (#67110)

Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/67110

Adds support for using lintrunner with:
- clang-format
- clang-tidy
- flake8
- mypy

Test Plan: Imported from OSS

Reviewed By: driazati

Differential Revision: D32145555

Pulled By: suo

fbshipit-source-id: 2150348e26fba4ae738cd0b9684b2889ce0f1133
This commit is contained in:
Michael Suo
2021-11-03 12:34:02 -07:00
committed by Facebook GitHub Bot
parent 89c4e8c22b
commit 6df0d7d502
9 changed files with 1604 additions and 17 deletions

119
.lintrunner.toml Normal file
View File

@ -0,0 +1,119 @@
[[linter]]
name = 'FLAKE8'
include_patterns = ['**/*.py']
exclude_patterns = [
'.git/**',
'build_code_analyzer',
'build_test_custom_build/**',
'build/**',
'caffe2/**',
'docs/caffe2/**',
'docs/cpp/src/**',
'docs/src/**',
'scripts/**',
'test/generated_type_hints_smoketest.py',
'third_party/**',
'torch/include/**',
'torch/lib/**',
'venv/**',
'**/*.pyi',
]
args = [
'python3',
'tools/linter/adapters/flake8_linter.py',
'--binary=flake8',
'--',
'@{{PATHSFILE}}'
]
[[linter]]
name = 'CLANGFORMAT'
include_patterns = [
'c10/**/*.h',
'c10/**/*.cpp',
'torch/csrc/jit/**/*.h',
'torch/csrc/jit/**/*.cpp',
'torch/csrc/deploy/**/*.h',
'torch/csrc/deploy/**/*.cpp',
'test/cpp/jit/**/*.h',
'test/cpp/jit/**/*.cpp',
'test/cpp/tensorexpr/**/*.h',
'test/cpp/tensorexpr/**/*.cpp',
]
exclude_patterns = []
args = [
'python3',
'tools/linter/adapters/clangformat_linter.py',
'--binary=clang-format',
'--',
'@{{PATHSFILE}}'
]
[[linter]]
name = 'MYPY'
include_patterns = ['**/*.py']
exclude_patterns = []
args = [
'python3',
'tools/linter/adapters/mypy_linter.py',
'--binary=mypy',
'--',
'@{{PATHSFILE}}'
]
[[linter]]
name = 'CLANGTIDY'
include_patterns = ['**/*.cpp']
exclude_patterns = []
init_args = [
'python3',
'tools/linter/adapters/clangtidy_init.py',
'--dry_run={{DRYRUN}}',
'--output_dir=.clang-tidy-bin',
'--output_name=clang-tidy',
]
args = [
'python3',
'tools/linter/adapters/clangtidy_linter.py',
'--binary=.clang-tidy-bin/clang-tidy',
'--build_dir=./build',
'--',
'@{{PATHSFILE}}'
]
[[linter]]
name = 'TYPEIGNORE'
include_patterns = ['**/*.py', '**/*.pyi']
exclude_patterns = ['test/test_jit.py']
args = [
'python3',
'tools/linter/adapters/grep_linter.py',
'--pattern=# type:\s*ignore(?!\[)',
'--linter_name=TYPEIGNORE',
'--error_name=unqualified type: ignore',
"""--error_description=\
This line has an unqualified `type: ignore`; \
please convert it to `type: ignore[xxxx]`\
""",
'--',
'@{{PATHSFILE}}'
]
[[linter]]
name = 'NOQA'
include_patterns = ['**/*.py', '**/*.pyi']
exclude_patterns = ['caffe2/**']
args = [
'python3',
'tools/linter/adapters/grep_linter.py',
'--pattern=# type:\s*ignore(?!\[)',
'--linter_name=TYPEIGNORE',
'--error_name=unqualified noqa',
"""--error_description=\
This line has an unqualified `noqa`; \
please convert it to `noqa: XXXX`\
""",
'--',
'@{{PATHSFILE}}'
]

View File

@ -0,0 +1,10 @@
# lintrunner adapters
These files adapt our various linters to work with `lintrunner`.
## Adding a new linter
1. init and linter
2. {{DRYRUN}} and {{PATHSFILE}}
3. never exit uncleanly
4. Communication protocol
5. Self-contained

View File

@ -0,0 +1,235 @@
import argparse
import concurrent.futures
import json
import logging
import os
import subprocess
import sys
import time
from enum import Enum
from typing import Any, List, NamedTuple, Optional
IS_WINDOWS: bool = os.name == "nt"
def eprint(*args: Any, **kwargs: Any) -> None:
print(*args, file=sys.stderr, flush=True, **kwargs)
class LintSeverity(str, Enum):
ERROR = "error"
WARNING = "warning"
ADVICE = "advice"
DISABLED = "disabled"
class LintMessage(NamedTuple):
path: str
line: Optional[int]
char: Optional[int]
code: str
severity: LintSeverity
name: str
original: Optional[str]
replacement: Optional[str]
description: Optional[str]
bypassChangedLineFiltering: Optional[bool]
def as_posix(name: str) -> str:
return name.replace("\\", "/") if IS_WINDOWS else name
def _run_command(
args: List[str],
*,
timeout: int,
) -> "subprocess.CompletedProcess[bytes]":
logging.debug("$ %s", " ".join(args))
start_time = time.monotonic()
try:
return subprocess.run(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=IS_WINDOWS, # So batch scripts are found.
timeout=timeout,
check=True,
)
finally:
end_time = time.monotonic()
logging.debug("took %dms", (end_time - start_time) * 1000)
def run_command(
args: List[str],
*,
retries: int,
timeout: int,
) -> "subprocess.CompletedProcess[bytes]":
remaining_retries = retries
while True:
try:
return _run_command(args, timeout=timeout)
except subprocess.TimeoutExpired as err:
if remaining_retries == 0:
raise err
remaining_retries -= 1
logging.warning(
"(%s/%s) Retrying because command failed with: %r",
retries - remaining_retries,
retries,
err,
)
time.sleep(1)
def check_file(
filename: str,
binary: str,
retries: int,
timeout: int,
) -> List[LintMessage]:
try:
with open(filename, "rb") as f:
original = f.read()
proc = run_command(
[binary, filename],
retries=retries,
timeout=timeout,
)
except subprocess.TimeoutExpired:
return [
LintMessage(
path=filename,
line=None,
char=None,
code="CLANGFORMAT",
severity=LintSeverity.ERROR,
name="timeout",
original=None,
replacement=None,
description=(
"clang-format timed out while trying to process a file. "
"Please report an issue in pytorch/pytorch with the "
"label 'module: lint'"
),
bypassChangedLineFiltering=None,
)
]
except (OSError, subprocess.CalledProcessError) as err:
return [
LintMessage(
path=filename,
line=None,
char=None,
code="CLANGFORMAT",
severity=LintSeverity.ADVICE,
name="command-failed",
original=None,
replacement=None,
description=(
f"Failed due to {err.__class__.__name__}:\n{err}"
if not isinstance(err, subprocess.CalledProcessError)
else (
"COMMAND (exit code {returncode})\n"
"{command}\n\n"
"STDERR\n{stderr}\n\n"
"STDOUT\n{stdout}"
).format(
returncode=err.returncode,
command=" ".join(as_posix(x) for x in err.cmd),
stderr=err.stderr.decode("utf-8").strip() or "(empty)",
stdout=err.stdout.decode("utf-8").strip() or "(empty)",
)
),
bypassChangedLineFiltering=None,
)
]
replacement = proc.stdout
if original == replacement:
return []
return [
LintMessage(
path=filename,
line=1,
char=1,
code="CLANGFORMAT",
severity=LintSeverity.WARNING,
name="format",
original=original.decode("utf-8"),
replacement=replacement.decode("utf-8"),
description="See https://clang.llvm.org/docs/ClangFormat.html.\nRun `lintrunner -a` to apply this patch.",
bypassChangedLineFiltering=True,
)
]
def main() -> None:
parser = argparse.ArgumentParser(
description="Format files with clang-format.",
fromfile_prefix_chars="@",
)
parser.add_argument(
"--binary",
required=True,
help="clang-format binary path",
)
parser.add_argument(
"--retries",
default=3,
type=int,
help="times to retry timed out clang-format",
)
parser.add_argument(
"--timeout",
default=90,
type=int,
help="seconds to wait for clang-format",
)
parser.add_argument(
"--verbose",
action="store_true",
help="verbose logging",
)
parser.add_argument(
"filenames",
nargs="+",
help="paths to lint",
)
args = parser.parse_args()
logging.basicConfig(
format="<%(threadName)s:%(levelname)s> %(message)s",
level=logging.NOTSET
if args.verbose
else logging.DEBUG
if len(args.filenames) < 1000
else logging.INFO,
stream=sys.stderr,
)
binary = os.path.normpath(args.binary) if IS_WINDOWS else args.binary
with concurrent.futures.ThreadPoolExecutor(
max_workers=os.cpu_count(),
thread_name_prefix="Thread",
) as executor:
futures = {
executor.submit(check_file, x, binary, args.retries, args.timeout): x
for x in args.filenames
}
for future in concurrent.futures.as_completed(futures):
try:
for lint_message in future.result():
print(json.dumps(lint_message._asdict()), flush=True)
except Exception:
logging.critical('Failed at "%s".', futures[future])
raise
if __name__ == "__main__":
main()

View File

@ -0,0 +1,220 @@
import platform
import argparse
import sys
import stat
import hashlib
import subprocess
import os
import urllib.request
import urllib.error
import pathlib
from typing import Dict
# String representing the host platform (e.g. Linux, Darwin).
HOST_PLATFORM = platform.system()
# PyTorch directory root
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"], stdout=subprocess.PIPE, check=True,
)
PYTORCH_ROOT = result.stdout.decode("utf-8").strip()
HASH_PATH = pathlib.Path(PYTORCH_ROOT) / "tools" / "linter" / "install" / "hashes"
def compute_file_sha256(path: str) -> str:
"""Compute the SHA256 hash of a file and return it as a hex string."""
# If the file doesn't exist, return an empty string.
if not os.path.exists(path):
return ""
hash = hashlib.sha256()
# Open the file in binary mode and hash it.
with open(path, "rb") as f:
for b in f:
hash.update(b)
# Return the hash as a hexadecimal string.
return hash.hexdigest()
def report_download_progress(
chunk_number: int, chunk_size: int, file_size: int
) -> None:
"""
Pretty printer for file download progress.
"""
if file_size != -1:
percent = min(1, (chunk_number * chunk_size) / file_size)
bar = "#" * int(64 * percent)
sys.stdout.write("\r0% |{:<64}| {}%".format(bar, int(percent * 100)))
def download_bin(name: str, output_dir: str, platform_to_url: Dict[str, str], dry_run: bool) -> bool:
"""
Downloads the binary appropriate for the host platform and stores it in the given output directory.
"""
if HOST_PLATFORM not in platform_to_url:
print(f"Unsupported platform: {HOST_PLATFORM}")
return False
url = platform_to_url[HOST_PLATFORM]
filename = os.path.join(output_dir, name)
if dry_run:
print(f"DRY RUN: Would download {url} to {filename}")
return True
# Try to download binary.
print(f"Downloading {name} to {output_dir}")
try:
urllib.request.urlretrieve(
url,
filename,
reporthook=report_download_progress if sys.stdout.isatty() else None,
)
except urllib.error.URLError as e:
print(f"Error downloading {filename}: {e}")
return False
finally:
print()
return True
def download(
name: str,
output_dir: str,
platform_to_url: Dict[str, str],
platform_to_hash: Dict[str, str],
dry_run: bool,
) -> bool:
"""
Download a platform-appropriate binary if one doesn't already exist at the expected location and verifies
that it is the right binary by checking its SHA256 hash against the expected hash.
"""
output_path = os.path.join(output_dir, name)
if not os.path.exists(output_dir):
# If the directory doesn't exist, try to create it.
if dry_run:
print(f"DRY RUN: would create directory for {name} binary: {output_dir}")
else:
try:
os.mkdir(output_dir)
except OSError as e:
print(f"Unable to create directory for {name} binary: {output_dir}")
return False
finally:
print(f"Created directory {output_dir} for {name} binary")
# If the directory didn't exist, neither did the binary, so download it.
ok = download_bin(name, output_dir, platform_to_url, dry_run)
if not ok:
return False
else:
# If the directory exists but the binary doesn't, download it.
if not os.path.exists(output_path):
ok = download_bin(name, output_dir, platform_to_url, dry_run)
if not ok:
return False
else:
print(f"Found pre-existing {name} binary, skipping download")
# Now that the binary is where it should be, hash it.
actual_bin_hash = compute_file_sha256(output_path)
# If the host platform is not in platform_to_hash, it is unsupported.
if HOST_PLATFORM not in platform_to_hash:
print(f"Unsupported platform: {HOST_PLATFORM}")
return False
# This is the path to the file containing the reference hash.
hashpath = os.path.join(PYTORCH_ROOT, platform_to_hash[HOST_PLATFORM])
if not os.path.exists(hashpath):
print("Unable to find reference binary hash")
return False
# Load the reference hash and compare the actual hash to it.
if dry_run:
# We didn't download anything, just bail
return True
with open(hashpath, "r") as f:
reference_bin_hash = f.readline().strip()
print(f"Reference Hash: {reference_bin_hash}")
print(f"Actual Hash: {repr(actual_bin_hash)}")
if reference_bin_hash != actual_bin_hash:
print("The downloaded binary is not what was expected!")
print(f"Downloaded hash: {repr(actual_bin_hash)} vs expected {reference_bin_hash}")
# Err on the side of caution and try to delete the downloaded binary.
try:
os.unlink(output_path)
print("The binary has been deleted just to be safe")
except OSError as e:
print(f"Failed to delete binary: {e}")
print("Delete this binary as soon as possible and do not execute it!")
return False
else:
# Make sure the binary is executable.
mode = os.stat(output_path).st_mode
mode |= stat.S_IXUSR
os.chmod(output_path, mode)
print(f"Using {name} located at {output_path}")
return True
PLATFORM_TO_URL = {
"Linux": "https://oss-clang-format.s3.us-east-2.amazonaws.com/linux64/clang-tidy",
"Darwin": "https://oss-clang-format.s3.us-east-2.amazonaws.com/macos/clang-tidy",
}
PLATFORM_TO_HASH = {
"Linux": os.path.join(HASH_PATH, "clang-tidy-linux64"),
"Darwin": os.path.join(HASH_PATH, "clang-tidy-macos"),
}
OUTPUT_DIR = os.path.join(PYTORCH_ROOT, ".clang-tidy-bin")
INSTALLATION_PATH = os.path.join(OUTPUT_DIR, "clang-tidy")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="downloads clang-tidy",
)
parser.add_argument(
"--output_dir",
required=True,
help="place to put the binary",
)
parser.add_argument(
"--output_name",
required=True,
help="name of binary",
)
parser.add_argument(
"--dry_run",
default=False,
help="do not download, just print what would be done",
)
args = parser.parse_args()
if args.dry_run == "0":
args.dry_run = False
else:
args.dry_run = True
ok = download(args.output_name, args.output_dir, PLATFORM_TO_URL, PLATFORM_TO_HASH, args.dry_run)
if not ok:
print(f"Failed to download clang-tidy binary from {PLATFORM_TO_URL}")
exit(1)

View File

@ -0,0 +1,263 @@
import argparse
import concurrent.futures
import json
import logging
import os
import re
import shutil
import subprocess
import sys
import time
from enum import Enum
from pathlib import Path
from typing import Any, List, NamedTuple, Optional, Pattern
IS_WINDOWS: bool = os.name == "nt"
def eprint(*args: Any, **kwargs: Any) -> None:
print(*args, file=sys.stderr, flush=True, **kwargs)
class LintSeverity(str, Enum):
ERROR = "error"
WARNING = "warning"
ADVICE = "advice"
DISABLED = "disabled"
class LintMessage(NamedTuple):
path: str
line: Optional[int]
char: Optional[int]
code: str
severity: LintSeverity
name: str
original: Optional[str]
replacement: Optional[str]
description: Optional[str]
bypassChangedLineFiltering: Optional[bool]
def as_posix(name: str) -> str:
return name.replace("\\", "/") if IS_WINDOWS else name
# c10/core/DispatchKey.cpp:281:26: error: 'k' used after it was moved [bugprone-use-after-move]
RESULTS_RE: Pattern[str] = re.compile(
r"""(?mx)
^
(?P<file>.*?):
(?P<line>\d+):
(?:(?P<column>-?\d+):)?
\s(?P<severity>\S+?):?
\s(?P<message>.*)
\s(?P<code>\[.*\])
$
"""
)
def run_command(
args: List[str],
) -> "subprocess.CompletedProcess[bytes]":
logging.debug("$ %s", " ".join(args))
start_time = time.monotonic()
try:
return subprocess.run(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
finally:
end_time = time.monotonic()
logging.debug("took %dms", (end_time - start_time) * 1000)
# Severity is either "error" or "note": https://git.io/JiLOP
severities = {
"error": LintSeverity.ERROR,
"warning": LintSeverity.WARNING,
}
def clang_search_dirs() -> List[str]:
# Compilers are ordered based on fallback preference
# We pick the first one that is available on the system
compilers = ["clang", "gcc", "cpp", "cc"]
compilers = [c for c in compilers if shutil.which(c) is not None]
if len(compilers) == 0:
raise RuntimeError(f"None of {compilers} were found")
compiler = compilers[0]
result = subprocess.run(
[compiler, "-E", "-x", "c++", "-", "-v"],
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
)
stderr = result.stderr.decode().strip().split("\n")
search_start = r"#include.*search starts here:"
search_end = r"End of search list."
append_path = False
search_paths = []
for line in stderr:
if re.match(search_start, line):
if append_path:
continue
else:
append_path = True
elif re.match(search_end, line):
break
elif append_path:
search_paths.append(line.strip())
return search_paths
include_args = []
include_dir = ["/usr/lib/llvm-11/include/openmp"] + clang_search_dirs()
for dir in include_dir:
include_args += ["--extra-arg", f"-I{dir}"]
def check_file(
filename: str,
binary: str,
build_dir: str,
) -> List[LintMessage]:
try:
proc = run_command(
[binary, f"-p={build_dir}", *include_args, filename],
)
except (OSError) as err:
return [
LintMessage(
path=filename,
line=None,
char=None,
code="CLANGTIDY",
severity=LintSeverity.ERROR,
name="command-failed",
original=None,
replacement=None,
description=(
f"Failed due to {err.__class__.__name__}:\n{err}"
),
bypassChangedLineFiltering=None,
)
]
lint_messages = []
try:
# Change the current working directory to the build directory, since
# clang-tidy will report files relative to the build directory.
saved_cwd = os.getcwd()
os.chdir(build_dir)
for match in RESULTS_RE.finditer(proc.stdout.decode()):
# Convert the reported path to an absolute path.
abs_path = str(Path(match["file"]).resolve())
message = LintMessage(
path=abs_path,
name=match["code"],
description=match["message"],
line=int(match["line"]),
char=int(match["column"])
if match["column"] is not None and not match["column"].startswith("-")
else None,
code="CLANGTIDY",
severity=severities.get(match["severity"], LintSeverity.ERROR),
original=None,
replacement=None,
bypassChangedLineFiltering=None,
)
lint_messages.append(message)
finally:
os.chdir(saved_cwd)
return lint_messages
def main() -> None:
parser = argparse.ArgumentParser(
description="clang-tidy wrapper linter.",
fromfile_prefix_chars="@",
)
parser.add_argument(
"--binary",
required=True,
help="clang-tidy binary path",
)
parser.add_argument(
"--build_dir",
required=True,
help=("Where the compile_commands.json file is located. "
"Gets passed to clang-tidy -p"),
)
parser.add_argument(
"--verbose",
action="store_true",
help="verbose logging",
)
parser.add_argument(
"filenames",
nargs="+",
help="paths to lint",
)
args = parser.parse_args()
logging.basicConfig(
format="<%(threadName)s:%(levelname)s> %(message)s",
level=logging.NOTSET
if args.verbose
else logging.DEBUG
if len(args.filenames) < 1000
else logging.INFO,
stream=sys.stderr,
)
if not os.path.exists(args.binary):
err_msg = LintMessage(
path="<none>",
line=None,
char=None,
code="CLANGTIDY",
severity=LintSeverity.ERROR,
name="command-failed",
original=None,
replacement=None,
description=(
f"Could not find clang-tidy binary at {args.binary},"
" you may need to run `lintrunner init`."
),
bypassChangedLineFiltering=None,
)
print(json.dumps(err_msg._asdict()), flush=True)
exit(0)
with concurrent.futures.ThreadPoolExecutor(
max_workers=os.cpu_count(),
thread_name_prefix="Thread",
) as executor:
futures = {
executor.submit(
check_file,
filename,
args.binary,
args.build_dir,
): filename
for filename in args.filenames
}
for future in concurrent.futures.as_completed(futures):
try:
for lint_message in future.result():
print(json.dumps(lint_message._asdict()), flush=True)
except Exception:
logging.critical('Failed at "%s".', futures[future])
raise
if __name__ == "__main__":
main()

View File

@ -0,0 +1,400 @@
import argparse
import concurrent.futures
import json
import logging
import os
import re
import subprocess
import sys
import time
from enum import Enum
from typing import Any, Dict, List, NamedTuple, Optional, Set, Pattern
IS_WINDOWS: bool = os.name == "nt"
def eprint(*args: Any, **kwargs: Any) -> None:
print(*args, file=sys.stderr, flush=True, **kwargs)
class LintSeverity(str, Enum):
ERROR = "error"
WARNING = "warning"
ADVICE = "advice"
DISABLED = "disabled"
class LintMessage(NamedTuple):
path: str
line: Optional[int]
char: Optional[int]
code: str
severity: LintSeverity
name: str
original: Optional[str]
replacement: Optional[str]
description: Optional[str]
bypassChangedLineFiltering: Optional[bool]
def as_posix(name: str) -> str:
return name.replace("\\", "/") if IS_WINDOWS else name
# fmt: off
# https://www.flake8rules.com/
DOCUMENTED_IN_FLAKE8RULES: Set[str] = {
"E101", "E111", "E112", "E113", "E114", "E115", "E116", "E117",
"E121", "E122", "E123", "E124", "E125", "E126", "E127", "E128", "E129",
"E131", "E133",
"E201", "E202", "E203",
"E211",
"E221", "E222", "E223", "E224", "E225", "E226", "E227", "E228",
"E231",
"E241", "E242",
"E251",
"E261", "E262", "E265", "E266",
"E271", "E272", "E273", "E274", "E275",
"E301", "E302", "E303", "E304", "E305", "E306",
"E401", "E402",
"E501", "E502",
"E701", "E702", "E703", "E704",
"E711", "E712", "E713", "E714",
"E721", "E722",
"E731",
"E741", "E742", "E743",
"E901", "E902", "E999",
"W191",
"W291", "W292", "W293",
"W391",
"W503", "W504",
"W601", "W602", "W603", "W604", "W605",
"F401", "F402", "F403", "F404", "F405",
"F811", "F812",
"F821", "F822", "F823",
"F831",
"F841",
"F901",
"C901",
}
# https://pypi.org/project/flake8-comprehensions/#rules
DOCUMENTED_IN_FLAKE8COMPREHENSIONS: Set[str] = {
"C400", "C401", "C402", "C403", "C404", "C405", "C406", "C407", "C408", "C409",
"C410",
"C411", "C412", "C413", "C413", "C414", "C415", "C416",
}
# https://github.com/PyCQA/flake8-bugbear#list-of-warnings
DOCUMENTED_IN_BUGBEAR: Set[str] = {
"B001", "B002", "B003", "B004", "B005", "B006", "B007", "B008", "B009", "B010",
"B011", "B012", "B013", "B014", "B015",
"B301", "B302", "B303", "B304", "B305", "B306",
"B901", "B902", "B903", "B950",
}
# fmt: on
# stdin:2: W802 undefined name 'foo'
# stdin:3:6: T484 Name 'foo' is not defined
# stdin:3:-100: W605 invalid escape sequence '\/'
# stdin:3:1: E302 expected 2 blank lines, found 1
RESULTS_RE: Pattern[str] = re.compile(
r"""(?mx)
^
(?P<file>.*?):
(?P<line>\d+):
(?:(?P<column>-?\d+):)?
\s(?P<code>\S+?):?
\s(?P<message>.*)
$
"""
)
def _test_results_re() -> None:
"""
>>> def t(s): return RESULTS_RE.search(s).groupdict()
>>> t(r"file.py:80:1: E302 expected 2 blank lines, found 1")
... # doctest: +NORMALIZE_WHITESPACE
{'file': 'file.py', 'line': '80', 'column': '1', 'code': 'E302',
'message': 'expected 2 blank lines, found 1'}
>>> t(r"file.py:7:1: P201: Resource `stdout` is acquired but not always released.")
... # doctest: +NORMALIZE_WHITESPACE
{'file': 'file.py', 'line': '7', 'column': '1', 'code': 'P201',
'message': 'Resource `stdout` is acquired but not always released.'}
>>> t(r"file.py:8:-10: W605 invalid escape sequence '/'")
... # doctest: +NORMALIZE_WHITESPACE
{'file': 'file.py', 'line': '8', 'column': '-10', 'code': 'W605',
'message': "invalid escape sequence '/'"}
"""
pass
def _run_command(
args: List[str],
*,
extra_env: Optional[Dict[str, str]],
) -> "subprocess.CompletedProcess[str]":
logging.debug(
"$ %s",
" ".join(
([f"{k}={v}" for (k, v) in extra_env.items()] if extra_env else []) + args
),
)
start_time = time.monotonic()
try:
return subprocess.run(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
encoding="utf-8",
)
finally:
end_time = time.monotonic()
logging.debug("took %dms", (end_time - start_time) * 1000)
def run_command(
args: List[str],
*,
extra_env: Optional[Dict[str, str]],
retries: int,
) -> "subprocess.CompletedProcess[str]":
remaining_retries = retries
while True:
try:
return _run_command(args, extra_env=extra_env)
except subprocess.CalledProcessError as err:
if remaining_retries == 0 or not re.match(
r"^ERROR:1:1: X000 linting with .+ timed out after \d+ seconds",
err.stdout,
):
raise err
remaining_retries -= 1
logging.warning(
"(%s/%s) Retrying because command failed with: %r",
retries - remaining_retries,
retries,
err,
)
time.sleep(1)
def get_issue_severity(code: str) -> LintSeverity:
# "B901": `return x` inside a generator
# "B902": Invalid first argument to a method
# "B903": __slots__ efficiency
# "B950": Line too long
# "C4": Flake8 Comprehensions
# "C9": Cyclomatic complexity
# "E2": PEP8 horizontal whitespace "errors"
# "E3": PEP8 blank line "errors"
# "E5": PEP8 line length "errors"
# "F401": Name imported but unused
# "F403": Star imports used
# "F405": Name possibly from star imports
# "T400": type checking Notes
# "T49": internal type checker errors or unmatched messages
if any(
code.startswith(x)
for x in [
"B9",
"C4",
"C9",
"E2",
"E3",
"E5",
"F401",
"F403",
"F405",
"T400",
"T49",
]
):
return LintSeverity.ADVICE
# "F821": Undefined name
# "E999": syntax error
if any(code.startswith(x) for x in ["F821", "E999"]):
return LintSeverity.ERROR
# "F": PyFlakes Error
# "B": flake8-bugbear Error
# "E": PEP8 "Error"
# "W": PEP8 Warning
# possibly other plugins...
return LintSeverity.WARNING
def get_issue_documentation_url(code: str) -> str:
if code in DOCUMENTED_IN_FLAKE8RULES:
return f"https://www.flake8rules.com/rules/{code}.html"
if code in DOCUMENTED_IN_FLAKE8COMPREHENSIONS:
return "https://pypi.org/project/flake8-comprehensions/#rules"
if code in DOCUMENTED_IN_BUGBEAR:
return "https://github.com/PyCQA/flake8-bugbear#list-of-warnings"
return ""
def check_file(
filename: str,
binary: str,
flake8_plugins_path: Optional[str],
severities: Dict[str, LintSeverity],
retries: int,
) -> List[LintMessage]:
try:
proc = run_command(
[binary, "--exit-zero", filename],
extra_env={"FLAKE8_PLUGINS_PATH": flake8_plugins_path}
if flake8_plugins_path
else None,
retries=retries,
)
except (OSError, subprocess.CalledProcessError) as err:
return [
LintMessage(
path=filename,
line=None,
char=None,
code="FLAKE8",
severity=LintSeverity.ERROR,
name="command-failed",
original=None,
replacement=None,
description=(
f"Failed due to {err.__class__.__name__}:\n{err}"
if not isinstance(err, subprocess.CalledProcessError)
else (
"COMMAND (exit code {returncode})\n"
"{command}\n\n"
"STDERR\n{stderr}\n\n"
"STDOUT\n{stdout}"
).format(
returncode=err.returncode,
command=" ".join(as_posix(x) for x in err.cmd),
stderr=err.stderr.strip() or "(empty)",
stdout=err.stdout.strip() or "(empty)",
)
),
bypassChangedLineFiltering=None,
)
]
return [
LintMessage(
path=match["file"],
name=match["code"],
description="{}\nSee {}".format(
match["message"],
get_issue_documentation_url(match["code"]),
),
line=int(match["line"]),
char=int(match["column"])
if match["column"] is not None and not match["column"].startswith("-")
else None,
code="FLAKE8",
severity=severities.get(match["code"]) or get_issue_severity(match["code"]),
original=None,
replacement=None,
bypassChangedLineFiltering=None,
)
for match in RESULTS_RE.finditer(proc.stdout)
]
def main() -> None:
parser = argparse.ArgumentParser(
description="Flake8 wrapper linter.",
fromfile_prefix_chars="@",
)
parser.add_argument(
"--binary",
required=True,
help="flake8 binary path",
)
parser.add_argument(
"--flake8-plugins-path",
help="FLAKE8_PLUGINS_PATH env value",
)
parser.add_argument(
"--severity",
action="append",
help="map code to severity (e.g. `B950:advice`)",
)
parser.add_argument(
"--retries",
default=3,
type=int,
help="times to retry timed out flake8",
)
parser.add_argument(
"--verbose",
action="store_true",
help="verbose logging",
)
parser.add_argument(
"filenames",
nargs="+",
help="paths to lint",
)
args = parser.parse_args()
logging.basicConfig(
format="<%(threadName)s:%(levelname)s> %(message)s",
level=logging.NOTSET
if args.verbose
else logging.DEBUG
if len(args.filenames) < 1000
else logging.INFO,
stream=sys.stderr,
)
flake8_plugins_path = (
None
if args.flake8_plugins_path is None
else os.path.realpath(args.flake8_plugins_path)
)
severities: Dict[str, LintSeverity] = {}
if args.severity:
for severity in args.severity:
parts = severity.split(":", 1)
assert len(parts) == 2, f"invalid severity `{severity}`"
severities[parts[0]] = LintSeverity(parts[1])
with concurrent.futures.ThreadPoolExecutor(
max_workers=os.cpu_count(),
thread_name_prefix="Thread",
) as executor:
futures = {
executor.submit(
check_file,
filename,
args.binary,
flake8_plugins_path,
severities,
args.retries,
): filename
for filename in args.filenames
}
for future in concurrent.futures.as_completed(futures):
try:
for lint_message in future.result():
print(json.dumps(lint_message._asdict()), flush=True)
except Exception:
logging.critical('Failed at "%s".', futures[future])
raise
if __name__ == "__main__":
main()

View File

@ -0,0 +1,145 @@
import argparse
import json
import logging
import os
import subprocess
import sys
import time
from enum import Enum
from typing import Any, List, NamedTuple, Optional
IS_WINDOWS: bool = os.name == "nt"
def eprint(*args: Any, **kwargs: Any) -> None:
print(*args, file=sys.stderr, flush=True, **kwargs)
class LintSeverity(str, Enum):
ERROR = "error"
WARNING = "warning"
ADVICE = "advice"
DISABLED = "disabled"
class LintMessage(NamedTuple):
path: str
line: Optional[int]
char: Optional[int]
code: str
severity: LintSeverity
name: str
original: Optional[str]
replacement: Optional[str]
description: Optional[str]
bypassChangedLineFiltering: Optional[bool]
def as_posix(name: str) -> str:
return name.replace("\\", "/") if IS_WINDOWS else name
def run_command(
args: List[str],
) -> "subprocess.CompletedProcess[bytes]":
logging.debug("$ %s", " ".join(args))
start_time = time.monotonic()
try:
return subprocess.run(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
finally:
end_time = time.monotonic()
logging.debug("took %dms", (end_time - start_time) * 1000)
def main() -> None:
parser = argparse.ArgumentParser(
description="grep wrapper linter.",
fromfile_prefix_chars="@",
)
parser.add_argument(
"--pattern",
required=True,
help="pattern to grep for",
)
parser.add_argument(
"--linter_name",
required=True,
help="name of the linter",
)
parser.add_argument(
"--error_name",
required=True,
help="human-readable description of what the error is",
)
parser.add_argument(
"--error_description",
required=True,
help="message to display when the pattern is found",
)
parser.add_argument(
"--verbose",
action="store_true",
help="verbose logging",
)
parser.add_argument(
"filenames",
nargs="+",
help="paths to lint",
)
args = parser.parse_args()
logging.basicConfig(
format="<%(threadName)s:%(levelname)s> %(message)s",
level=logging.NOTSET
if args.verbose
else logging.DEBUG
if len(args.filenames) < 1000
else logging.INFO,
stream=sys.stderr,
)
try:
proc = run_command(["grep", "-nPH", args.pattern, *args.filenames])
except OSError as err:
err_msg = LintMessage(
path="<none>",
line=None,
char=None,
code=args.linter_name,
severity=LintSeverity.ERROR,
name="command-failed",
original=None,
replacement=None,
description=(
f"Failed due to {err.__class__.__name__}:\n{err}"
),
bypassChangedLineFiltering=None,
)
print(json.dumps(err_msg._asdict()), flush=True)
exit(0)
lines = proc.stdout.decode().splitlines()
for line in lines:
# tools/linter/clangtidy_linter.py:13:import foo.bar.baz
split = line.split(":")
msg = LintMessage(
path=split[0],
line=int(split[1]),
char=None,
code=args.linter_name,
severity=LintSeverity.ERROR,
name=args.error_name,
original=None,
replacement=None,
description=args.error_description,
bypassChangedLineFiltering=None,
)
print(json.dumps(msg._asdict()), flush=True)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,195 @@
import argparse
import concurrent.futures
import json
import logging
import os
import re
import subprocess
import sys
import time
from enum import Enum
from typing import Any, Dict, List, NamedTuple, Optional, Pattern
IS_WINDOWS: bool = os.name == "nt"
def eprint(*args: Any, **kwargs: Any) -> None:
print(*args, file=sys.stderr, flush=True, **kwargs)
class LintSeverity(str, Enum):
ERROR = "error"
WARNING = "warning"
ADVICE = "advice"
DISABLED = "disabled"
class LintMessage(NamedTuple):
path: str
line: Optional[int]
char: Optional[int]
code: str
severity: LintSeverity
name: str
original: Optional[str]
replacement: Optional[str]
description: Optional[str]
bypassChangedLineFiltering: Optional[bool]
def as_posix(name: str) -> str:
return name.replace("\\", "/") if IS_WINDOWS else name
# tools/linter/flake8_linter.py:15:13: error: Incompatibl...int") [assignment]
RESULTS_RE: Pattern[str] = re.compile(
r"""(?mx)
^
(?P<file>.*?):
(?P<line>\d+):
(?:(?P<column>-?\d+):)?
\s(?P<severity>\S+?):?
\s(?P<message>.*)
\s(?P<code>\[.*\])
$
"""
)
def run_command(
args: List[str],
*,
extra_env: Optional[Dict[str, str]],
retries: int,
) -> "subprocess.CompletedProcess[bytes]":
logging.debug("$ %s", " ".join(args))
start_time = time.monotonic()
try:
return subprocess.run(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
finally:
end_time = time.monotonic()
logging.debug("took %dms", (end_time - start_time) * 1000)
# Severity is either "error" or "note": https://git.io/JiLOP
severities = {
"error": LintSeverity.ERROR,
"note": LintSeverity.ADVICE,
}
def check_file(
filename: str,
binary: str,
retries: int,
) -> List[LintMessage]:
try:
proc = run_command(
[binary, filename],
extra_env={},
retries=retries,
)
except OSError as err:
return [
LintMessage(
path=filename,
line=None,
char=None,
code="MYPY",
severity=LintSeverity.ERROR,
name="command-failed",
original=None,
replacement=None,
description=(
f"Failed due to {err.__class__.__name__}:\n{err}"
),
bypassChangedLineFiltering=None,
)
]
stdout = str(proc.stdout, "utf-8").strip()
return [
LintMessage(
path=match["file"],
name=match["code"],
description=match["message"],
line=int(match["line"]),
char=int(match["column"])
if match["column"] is not None and not match["column"].startswith("-")
else None,
code="MYPY",
severity=severities.get(match["severity"], LintSeverity.ERROR),
original=None,
replacement=None,
bypassChangedLineFiltering=None,
)
for match in RESULTS_RE.finditer(stdout)
]
def main() -> None:
parser = argparse.ArgumentParser(
description="mypy wrapper linter.",
fromfile_prefix_chars="@",
)
parser.add_argument(
"--binary",
required=True,
help="mypy binary path",
)
parser.add_argument(
"--retries",
default=3,
type=int,
help="times to retry timed out mypy",
)
parser.add_argument(
"--verbose",
action="store_true",
help="verbose logging",
)
parser.add_argument(
"filenames",
nargs="+",
help="paths to lint",
)
args = parser.parse_args()
logging.basicConfig(
format="<%(threadName)s:%(levelname)s> %(message)s",
level=logging.NOTSET
if args.verbose
else logging.DEBUG
if len(args.filenames) < 1000
else logging.INFO,
stream=sys.stderr,
)
with concurrent.futures.ThreadPoolExecutor(
max_workers=os.cpu_count(),
thread_name_prefix="Thread",
) as executor:
futures = {
executor.submit(
check_file,
filename,
args.binary,
args.retries,
): filename
for filename in args.filenames
}
for future in concurrent.futures.as_completed(futures):
try:
for lint_message in future.result():
print(json.dumps(lint_message._asdict()), flush=True)
except Exception:
logging.critical('Failed at "%s".', futures[future])
raise
if __name__ == "__main__":
main()

View File

@ -55,14 +55,14 @@ def download_bin(name: str, output_dir: str, platform_to_url: Dict[str, str]) ->
Downloads the binary appropriate for the host platform and stores it in the given output directory.
"""
if HOST_PLATFORM not in platform_to_url:
print(f"Unsupported platform: {HOST_PLATFORM}")
print(f"Unsupported platform: {HOST_PLATFORM}", file=sys.stderr)
return False
url = platform_to_url[HOST_PLATFORM]
filename = os.path.join(output_dir, name)
# Try to download binary.
print(f"Downloading {name} to {output_dir}")
print(f"Downloading {name} to {output_dir}", file=sys.stderr)
try:
urllib.request.urlretrieve(
url,
@ -70,10 +70,10 @@ def download_bin(name: str, output_dir: str, platform_to_url: Dict[str, str]) ->
reporthook=report_download_progress if sys.stdout.isatty() else None,
)
except urllib.error.URLError as e:
print(f"Error downloading {filename}: {e}")
print(f"Error downloading {filename}: {e}", file=sys.stderr)
return False
finally:
print()
print(file=sys.stderr)
return True
@ -96,11 +96,11 @@ def download(
try:
os.mkdir(output_dir)
except OSError as e:
print(f"Unable to create directory for {name} binary: {output_dir}")
print(f"Unable to create directory for {name} binary: {output_dir}", file=sys.stderr)
return False
finally:
if verbose:
print(f"Created directory {output_dir} for {name} binary")
print(f"Created directory {output_dir} for {name} binary", file=sys.stderr)
# If the directory didn't exist, neither did the binary, so download it.
ok = download_bin(name, output_dir, platform_to_url)
@ -116,21 +116,21 @@ def download(
return False
else:
if verbose:
print(f"Found pre-existing {name} binary, skipping download")
print(f"Found pre-existing {name} binary, skipping download", file=sys.stderr)
# Now that the binary is where it should be, hash it.
actual_bin_hash = compute_file_sha256(output_path)
# If the host platform is not in platform_to_hash, it is unsupported.
if HOST_PLATFORM not in platform_to_hash:
print(f"Unsupported platform: {HOST_PLATFORM}")
print(f"Unsupported platform: {HOST_PLATFORM}", file=sys.stderr)
return False
# This is the path to the file containing the reference hash.
hashpath = os.path.join(PYTORCH_ROOT, platform_to_hash[HOST_PLATFORM])
if not os.path.exists(hashpath):
print("Unable to find reference binary hash")
print("Unable to find reference binary hash", file=sys.stderr)
return False
# Load the reference hash and compare the actual hash to it.
@ -138,20 +138,20 @@ def download(
reference_bin_hash = f.readline().strip()
if verbose:
print(f"Reference Hash: {reference_bin_hash}")
print(f"Actual Hash: {repr(actual_bin_hash)}")
print(f"Reference Hash: {reference_bin_hash}", file=sys.stderr)
print(f"Actual Hash: {repr(actual_bin_hash)}", file=sys.stderr)
if reference_bin_hash != actual_bin_hash:
print("The downloaded binary is not what was expected!")
print(f"Downloaded hash: {repr(actual_bin_hash)} vs expected {reference_bin_hash}")
print("The downloaded binary is not what was expected!", file=sys.stderr)
print(f"Downloaded hash: {repr(actual_bin_hash)} vs expected {reference_bin_hash}", file=sys.stderr)
# Err on the side of caution and try to delete the downloaded binary.
try:
os.unlink(output_path)
print("The binary has been deleted just to be safe")
print("The binary has been deleted just to be safe", file=sys.stderr)
except OSError as e:
print(f"Failed to delete binary: {e}")
print("Delete this binary as soon as possible and do not execute it!")
print(f"Failed to delete binary: {e}", file=sys.stderr)
print("Delete this binary as soon as possible and do not execute it!", file=sys.stderr)
return False
else:
@ -159,6 +159,6 @@ def download(
mode = os.stat(output_path).st_mode
mode |= stat.S_IXUSR
os.chmod(output_path, mode)
print(f"Using {name} located at {output_path}")
print(f"Using {name} located at {output_path}", file=sys.stderr)
return True