Revert "trymerge to ignore certain failures (#91134)"

This reverts commit 8b7bd5dffccf342cacae510d6c5a6ca2665770b7.

Reverted https://github.com/pytorch/pytorch/pull/91134 on behalf of https://github.com/seemethere due to Breaks internal `github-export-checks` see failure: https://fburl.com/sandcastle/ggqj29pz
This commit is contained in:
PyTorch MergeBot
2023-02-04 08:08:32 +00:00
parent 170a3e0257
commit 7fb2ac2bd5
6 changed files with 90 additions and 6206 deletions

View File

@ -15,6 +15,7 @@ from typing import (
Callable,
Dict,
List,
NamedTuple,
Optional,
Pattern,
Tuple,
@ -38,15 +39,10 @@ from trymerge_explainer import (
get_revert_message,
)
class JobCheckState:
def __init__(self, name: str, url: str, status: Optional[str], classification: Optional[str] = None):
self.name = name
self.url = url
self.status = status
self.classification = classification
def __repr__(self) -> str:
return f"JobCheckState([{self.name},{self.url},{self.status},{self.classification}])"
class JobCheckState(NamedTuple):
name: str
url: str
status: Optional[str]
JobNameToStateDict = Dict[str, JobCheckState]
@ -57,18 +53,6 @@ class WorkflowCheckState:
self.status: Optional[str] = status
self.jobs: JobNameToStateDict = {}
class FlakyRule:
def __init__(self, name: str, captures: List[str]):
self.name = name
self.captures = captures
def matches(self, job: Optional[Dict[str, Any]]) -> bool:
return (
job is not None
and self.name in job.get('name', '')
and job.get("failure_captures") is not None
and all([capture in job.get("failure_captures", []) for capture in self.captures])
)
GH_PR_REVIEWS_FRAGMENT = """
fragment PRReviews on PullRequestReviewConnection {
@ -459,31 +443,27 @@ def _fetch_url(url: str, *,
print(f"Rate limit exceeded: {err.headers['X-RateLimit-Used']}/{err.headers['X-RateLimit-Limit']}")
raise
def _fetch_json_any(
url: str,
params: Optional[Dict[str, Any]] = None,
data: Optional[Dict[str, Any]] = None
) -> Any:
def fetch_json(url: str,
params: Optional[Dict[str, Any]] = None,
data: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
headers = {'Accept': 'application/vnd.github.v3+json'}
if params is not None and len(params) > 0:
url += '?' + '&'.join(f"{name}={urllib.parse.quote(str(val))}" for name, val in params.items())
return _fetch_url(url, headers=headers, data=data, reader=json.load)
def fetch_json_list(url: str,
params: Optional[Dict[str, Any]] = None,
data: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
return cast(List[Dict[str, Any]], _fetch_json_any(url, params, data))
return cast(List[Dict[str, Any]], _fetch_url(url, headers=headers, data=data, reader=json.load))
def fetch_json_dict(url: str,
params: Optional[Dict[str, Any]] = None,
data: Optional[Dict[str, Any]] = None) -> Dict[str, Any] :
return cast(Dict[str, Any], _fetch_json_any(url, params, data))
headers = {'Accept': 'application/vnd.github.v3+json'}
if params is not None and len(params) > 0:
url += '?' + '&'.join(f"{name}={urllib.parse.quote(str(val))}" for name, val in params.items())
return cast(Dict[str, Any], _fetch_url(url, headers=headers, data=data, reader=json.load))
def _gh_post_comment(url: str, comment: str, dry_run: bool = False) -> List[Dict[str, Any]]:
if dry_run:
print(comment)
return []
return fetch_json_list(url, data={"body": comment})
return fetch_json(url, data={"body": comment})
def gh_post_pr_comment(org: str, project: str, pr_num: int, comment: str, dry_run: bool = False) -> List[Dict[str, Any]]:
@ -495,8 +475,8 @@ def gh_post_commit_comment(org: str, project: str, sha: str, comment: str, dry_r
def gh_add_labels(org: str, project: str, pr_num: int, labels: Union[str, List[str]]) -> None:
fetch_json_list(f'https://api.github.com/repos/{org}/{project}/issues/{pr_num}/labels',
data={"labels": labels})
fetch_json(f'https://api.github.com/repos/{org}/{project}/issues/{pr_num}/labels',
data={"labels": labels})
def gh_graphql(query: str, **kwargs: Any) -> Dict[str, Any]:
@ -700,7 +680,6 @@ class GitHubPR:
self.comments: Optional[List[GitHubComment]] = None
self._authors: Optional[List[Tuple[str, str]]] = None
self._reviews: Optional[List[Tuple[str, str]]] = None
self.merge_base: Optional[str] = None
def is_closed(self) -> bool:
return bool(self.info["closed"])
@ -732,26 +711,6 @@ class GitHubPR:
def last_commit(self) -> Any:
return self.info["commits"]["nodes"][-1]["commit"]
def fetch(self, branch_name: Optional[str] = None) -> None:
repo = GitRepo(get_git_repo_dir(), get_git_remote_name())
if branch_name is None:
branch_name = f"__pull-request-{self.pr_num}__init__"
try:
r = repo._run_git("rev-parse", branch_name)
if r.strip() == self.last_commit()['oid']:
return
except Exception:
pass
repo.fetch(f"pull/{self.pr_num}/head", branch_name)
def get_merge_base(self) -> str:
if self.merge_base is not None:
return self.merge_base
self.fetch()
gitrepo = GitRepo(get_git_repo_dir(), get_git_remote_name())
self.merge_base = gitrepo.get_merge_base("origin/master", self.last_commit()['oid'])
return self.merge_base
def get_changed_files(self) -> List[str]:
if self.changed_files is None:
info = self.info
@ -1061,7 +1020,7 @@ class GitHubPR:
if not self.is_ghstack_pr():
msg = self.gen_commit_message()
pr_branch_name = f"__pull-request-{self.pr_num}__init__"
self.fetch(pr_branch_name)
repo.fetch(f"pull/{self.pr_num}/head", pr_branch_name)
repo._run_git("merge", "--squash", pr_branch_name)
repo._run_git("commit", f"--author=\"{self.get_author()}\"", "-m", msg)
return []
@ -1119,7 +1078,7 @@ class MergeRule:
patterns: List[str]
approved_by: List[str]
mandatory_checks_name: Optional[List[str]]
ignore_flaky_failures: bool = True
def gen_new_issue_link(
org: str,
@ -1133,9 +1092,8 @@ def gen_new_issue_link(
f"template={urllib.parse.quote(template)}")
def read_merge_and_flaky_rules(repo: Optional[GitRepo], org: str, project: str) -> Tuple[List[MergeRule], List[FlakyRule]]:
def read_merge_rules(repo: Optional[GitRepo], org: str, project: str) -> List[MergeRule]:
repo_relative_rules_path = MERGE_RULE_PATH
rc = None
if repo is None:
json_data = _fetch_url(
f"https://api.github.com/repos/{org}/{project}/contents/{repo_relative_rules_path}",
@ -1143,24 +1101,15 @@ def read_merge_and_flaky_rules(repo: Optional[GitRepo], org: str, project: str)
reader=json.load,
)
content = base64.b64decode(json_data["content"])
rc = yaml.safe_load(content)
return [MergeRule(**x) for x in yaml.safe_load(content)]
else:
rules_path = Path(repo.repo_dir) / repo_relative_rules_path
if not rules_path.exists():
print(f"{rules_path} does not exist, returning empty rules")
return [], []
return []
with open(rules_path) as fp:
rc = yaml.safe_load(fp)
merge_rules = []
flaky_rules = []
for x in rc:
try:
merge_rules.append(MergeRule(**x))
except Exception as e:
if "flaky_rules_location_url" in x:
flaky_rules = get_flaky_rules(x["flaky_rules_location_url"], 3)
return merge_rules, flaky_rules
return [MergeRule(**x) for x in rc]
def find_matching_merge_rule(
@ -1173,6 +1122,7 @@ def find_matching_merge_rule(
"""Returns merge rule matching to this pr or raises an exception"""
changed_files = pr.get_changed_files()
approved_by = set(pr.get_approved_by())
checks = get_combined_checks_from_pr_and_land_validation(pr, land_check_commit)
issue_link = gen_new_issue_link(
org=pr.org,
@ -1181,12 +1131,10 @@ def find_matching_merge_rule(
)
reject_reason = f"No rule found to match PR. Please [report]{issue_link} this issue to DevX team."
rules, flaky_rules = read_merge_and_flaky_rules(repo, pr.org, pr.project)
rules = read_merge_rules(repo, pr.org, pr.project)
if not rules:
reject_reason = f"Rejecting the merge as no rules are defined for the repository in {MERGE_RULE_PATH}"
raise RuntimeError(reject_reason)
checks = get_combined_checks_from_pr_and_land_validation(pr, land_check_commit)
checks = get_classifications(pr.last_commit()['oid'], pr.get_merge_base(), checks, flaky_rules)
# PRs can fail multiple merge rules, but it only needs to pass one rule to be approved.
# If it fails all rules, we need to find the rule that it came closest to passing and report
@ -1250,11 +1198,7 @@ def find_matching_merge_rule(
# Does the PR pass the checks required by this rule?
mandatory_checks = rule.mandatory_checks_name if rule.mandatory_checks_name is not None else []
required_checks = list(filter(lambda x: "EasyCLA" in x or not skip_mandatory_checks, mandatory_checks))
[pending_checks, failed_checks] = categorize_checks(
checks,
required_checks,
ok_failed_checks_threshold=3 if rule.ignore_flaky_failures else 0
)
[pending_checks, failed_checks] = categorize_checks(checks, required_checks)
hud_link = f"https://hud.pytorch.org/{pr.org}/{pr.project}/commit/{pr.last_commit()['oid']}"
if len(failed_checks) > 0:
@ -1321,92 +1265,6 @@ def checks_to_str(checks: List[Tuple[str, Optional[str]]]) -> str:
def checks_to_markdown_bullets(checks: List[Tuple[str, Optional[str]]]) -> List[str]:
return [f"- [{c[0]}]({c[1]})" if c[1] is not None else f"- {c[0]}" for c in checks[:5]]
def get_flaky_rules(url: str, num_retries: int = 3) -> List[FlakyRule]:
try:
return [FlakyRule(**rule) for rule in fetch_json_list(url)]
except Exception as e:
print(f"Could not download {url} because: {e}.")
if num_retries > 0:
return get_flaky_rules(url, num_retries=num_retries - 1)
return []
def get_rockset_results(head_sha: str, merge_base: str, num_retries: int = 3) -> List[Dict[str, Any]]:
query = f"""
SELECT
w.name as workflow_name,
j.id,
j.name,
j.conclusion,
j.completed_at,
j.html_url,
j.head_sha,
j.torchci_classification.captures as failure_captures,
LENGTH(j.steps) as steps,
FROM
commons.workflow_job j join commons.workflow_run w on w.id = j.run_id
where
j.head_sha in ('{head_sha}','{merge_base}')
"""
try:
import rockset # type: ignore[import]
res = rockset.RocksetClient(
host="api.usw2a1.rockset.com", api_key=os.environ["ROCKSET_API_KEY"]
).sql(query)
return cast(List[Dict[str, Any]], res.results)
except ModuleNotFoundError:
print("Could not use RockSet as rocket dependency is missing")
return []
except Exception as e:
print(f"Could not download rockset data because: {e}.")
if num_retries > 0:
return get_rockset_results(head_sha, merge_base, num_retries=num_retries - 1)
return []
def get_classifications(
head_sha: str,
merge_base: str,
checks: Dict[str, JobCheckState],
flaky_rules: List[FlakyRule]
) -> Dict[str, JobCheckState]:
rockset_results = get_rockset_results(head_sha, merge_base)
head_sha_jobs: Dict[str, Dict[str, Any]] = {}
merge_base_jobs: Dict[str, Dict[str, Any]] = {}
def insert(d: Dict[str, Dict[str, Any]], key: str, val: Dict[str, Any]) -> None:
if key not in d:
d[key] = val
return
if d[key]["id"] < val["id"]:
d[key] = val
for rockset_result in rockset_results:
name = f"{rockset_result['workflow_name']} / {rockset_result['name']}"
if rockset_result["head_sha"] == head_sha:
insert(head_sha_jobs, name, rockset_result)
else:
insert(merge_base_jobs, name, rockset_result)
for name, check in checks.items():
if check.status == "SUCCESS":
continue
head_sha_job = head_sha_jobs.get(name)
merge_base_job = merge_base_jobs.get(name)
if (
head_sha_job is not None
and merge_base_job is not None
and head_sha_job["conclusion"] == merge_base_job["conclusion"]
and head_sha_job["failure_captures"] == merge_base_job["failure_captures"]
):
check.classification = "BROKEN_TRUNK"
elif any([rule.matches(head_sha_job) for rule in flaky_rules]):
check.classification = "FLAKY"
return checks
def get_combined_checks_from_pr_and_land_validation(
pr: GitHubPR,
land_check_commit: Optional[str],
@ -1509,7 +1367,7 @@ def check_for_sev(org: str, project: str, skip_mandatory_checks: bool) -> None:
return
response = cast(
Dict[str, Any],
fetch_json_list(
fetch_json(
"https://api.github.com/search/issues",
params={"q": f'repo:{org}/{project} is:open is:issue label:"ci: sev"'},
),
@ -1542,11 +1400,9 @@ def has_label(labels: List[str], pattern: Pattern[str] = CIFLOW_LABEL) -> bool:
def categorize_checks(
check_runs: JobNameToStateDict,
required_checks: List[str],
ok_failed_checks_threshold: int = 3
) -> Tuple[List[Tuple[str, Optional[str]]], List[Tuple[str, Optional[str]]]]:
pending_checks: List[Tuple[str, Optional[str]]] = []
failed_checks: List[Tuple[str, Optional[str]]] = []
ok_failed_checks: List[Tuple[str, Optional[str]]] = []
relevant_checknames = [name for name in check_runs.keys() if any([x in name for x in required_checks])]
@ -1557,23 +1413,7 @@ def categorize_checks(
if check_runs[checkname].status is None:
pending_checks.append((checkname, check_runs[checkname].url))
elif not is_passing_status(check_runs[checkname].status):
if check_runs[checkname].classification in ('BROKEN_TRUNK', 'FLAKY'):
ok_failed_checks.append((checkname, check_runs[checkname].url))
else:
failed_checks.append((checkname, check_runs[checkname].url))
if ok_failed_checks:
print(
f"The following {len(ok_failed_checks)} checks failed but were likely due flakiness or broken trunk: " +
", ".join([x[0] for x in ok_failed_checks]) +
(f" but this is greater than the threshold of {ok_failed_checks_threshold} so merge will fail"
if len(ok_failed_checks) > ok_failed_checks_threshold
else '')
)
if len(ok_failed_checks) > ok_failed_checks_threshold:
failed_checks = failed_checks + ok_failed_checks
failed_checks.append((checkname, check_runs[checkname].url))
return (pending_checks, failed_checks)
def merge(pr_num: int, repo: GitRepo,
@ -1635,7 +1475,6 @@ def merge(pr_num: int, repo: GitRepo,
start_time = time.time()
last_exception = ''
elapsed_time = 0.0
_, flaky_rules = read_merge_and_flaky_rules(repo, pr.org, pr.project)
while elapsed_time < timeout_minutes * 60:
check_for_sev(org, project, skip_mandatory_checks)
current_time = time.time()
@ -1649,23 +1488,15 @@ def merge(pr_num: int, repo: GitRepo,
try:
required_checks = []
failed_rule_message = None
ignore_flaky_failures = True
try:
find_matching_merge_rule(pr, repo)
except MandatoryChecksMissingError as ex:
if ex.rule is not None:
ignore_flaky_failures = ex.rule.ignore_flaky_failures
if ex.rule.mandatory_checks_name is not None:
required_checks = ex.rule.mandatory_checks_name
if ex.rule is not None and ex.rule.mandatory_checks_name is not None:
required_checks = ex.rule.mandatory_checks_name
failed_rule_message = ex
checks = get_combined_checks_from_pr_and_land_validation(pr, land_check_commit)
checks = get_classifications(pr.last_commit()['oid'], pr.get_merge_base(), checks, flaky_rules)
pending, failing = categorize_checks(
checks,
required_checks + [x for x in checks.keys() if x not in required_checks],
ok_failed_checks_threshold=3 if ignore_flaky_failures else 0
)
pending, failing = categorize_checks(checks, required_checks + [x for x in checks.keys() if x not in required_checks])
# HACK until GitHub will be better about surfacing those
startup_failures = filter_checks_with_lambda(checks, lambda status: status == "STARTUP_FAILURE")
if len(startup_failures) > 0: