mirror of
https://github.com/pytorch/pytorch.git
synced 2025-10-20 21:14:14 +08:00
Adds script to generate alerts for failing jobs (#102002)
Copies over bits of the script from test-infra to grab the relevant parts an alert and turns them into a json. Generally copied over from check_alerts in pytorch/test-infra <!-- copilot:poem --> ### <samp>🤖 Generated by Copilot at 1789c36</samp> > _`Python 3` shebang_ > _added for compatibility_ > _a good practice / spring_ Pull Request resolved: https://github.com/pytorch/pytorch/pull/102002 Approved by: https://github.com/huydhn, https://github.com/ZainRizvi
This commit is contained in:
0
tools/alerts/__init__.py
Normal file
0
tools/alerts/__init__.py
Normal file
318
tools/alerts/create_alerts.py
Normal file
318
tools/alerts/create_alerts.py
Normal file
@ -0,0 +1,318 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from collections import defaultdict
|
||||
from difflib import SequenceMatcher
|
||||
from typing import Any, Dict, List, Set, Tuple
|
||||
|
||||
import requests
|
||||
from setuptools import distutils # type: ignore[import]
|
||||
|
||||
ALL_SKIPPED_THRESHOLD = 100
|
||||
SIMILARITY_THRESHOLD = 0.75
|
||||
FAILURE_CHAIN_THRESHOLD = 2
|
||||
MAX_CONCURRENT_ALERTS = 1
|
||||
FAILED_JOB_PATTERN = (
|
||||
r"^- \[(.*)\]\(.*\) failed consecutively starting with commit \[.*\]\(.*\)$"
|
||||
)
|
||||
|
||||
PENDING = "pending"
|
||||
NEUTRAL = "neutral"
|
||||
SKIPPED = "skipped"
|
||||
SUCCESS = "success"
|
||||
FAILURE = "failure"
|
||||
CANCELED = "canceled"
|
||||
|
||||
ISSUES_WITH_LABEL_QUERY = """
|
||||
query ($owner: String!, $name: String!, $labels: [String!]) {
|
||||
repository(owner: $owner, name: $name, followRenames: false) {
|
||||
issues(last: 10, labels: $labels, states: [OPEN]) {
|
||||
nodes {
|
||||
id
|
||||
title
|
||||
closed
|
||||
number
|
||||
body
|
||||
createdAt
|
||||
comments(first: 100) {
|
||||
nodes {
|
||||
bodyText
|
||||
databaseId
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
NUM_ISSUES_QUERY = """
|
||||
query ($query: String!) {
|
||||
search(type: ISSUE, query: $query) {
|
||||
issueCount
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
DISABLED_ALERTS = [
|
||||
"rerun_disabled_tests",
|
||||
"unstable",
|
||||
]
|
||||
|
||||
|
||||
class JobStatus:
|
||||
job_name: str = ""
|
||||
jobs: List[Any] = []
|
||||
current_status: Any = None
|
||||
job_statuses: List[Any] = []
|
||||
filtered_statuses: List[Any] = []
|
||||
failure_chain: List[Any] = []
|
||||
flaky_jobs: List[Any] = []
|
||||
|
||||
def __init__(self, job_name: str, job_statuses: List[Any]):
|
||||
self.job_name = job_name
|
||||
self.job_statuses = job_statuses
|
||||
|
||||
self.filtered_statuses = list(
|
||||
filter(lambda j: not is_job_skipped(j), job_statuses)
|
||||
)
|
||||
self.current_status = self.get_current_status()
|
||||
self.failure_chain = self.get_most_recent_failure_chain()
|
||||
self.flaky_jobs = self.get_flaky_jobs()
|
||||
|
||||
def get_current_status(self) -> Any:
|
||||
"""
|
||||
When getting the current status, we want the latest status which is not pending,
|
||||
be it success or failure
|
||||
"""
|
||||
for status in self.filtered_statuses:
|
||||
if status["conclusion"] != PENDING:
|
||||
return status
|
||||
return None
|
||||
|
||||
def get_unique_failures(self, jobs: List[Any]) -> Dict[str, List[Any]]:
|
||||
"""
|
||||
Returns list of jobs grouped by failureCaptures from the input list
|
||||
"""
|
||||
failures = defaultdict(list)
|
||||
for job in jobs:
|
||||
if job["conclusion"] == "failure":
|
||||
found_similar_failure = False
|
||||
if "failureCaptures" not in job:
|
||||
failures["unclassified"] = [job]
|
||||
continue
|
||||
|
||||
# This is now a list returned by HUD API, not a string
|
||||
failureCaptures = " ".join(job["failureCaptures"])
|
||||
|
||||
for failure in failures:
|
||||
seq = SequenceMatcher(None, failureCaptures, failure)
|
||||
if seq.ratio() > SIMILARITY_THRESHOLD:
|
||||
failures[failure].append(job)
|
||||
found_similar_failure = True
|
||||
break
|
||||
if not found_similar_failure:
|
||||
failures[failureCaptures] = [job]
|
||||
|
||||
return failures
|
||||
|
||||
# A flaky job is if it's the only job that has that failureCapture and is not the most recent job
|
||||
def get_flaky_jobs(self) -> List[Any]:
|
||||
unique_failures = self.get_unique_failures(self.filtered_statuses)
|
||||
flaky_jobs = []
|
||||
for failure in unique_failures:
|
||||
failure_list = unique_failures[failure]
|
||||
if (
|
||||
len(failure_list) == 1
|
||||
and failure_list[0]["sha"] != self.current_status["sha"]
|
||||
):
|
||||
flaky_jobs.append(failure_list[0])
|
||||
return flaky_jobs
|
||||
|
||||
# The most recent failure chain is an array of jobs that have the same-ish failures.
|
||||
# A success in the middle of the chain will terminate the chain.
|
||||
def get_most_recent_failure_chain(self) -> List[Any]:
|
||||
failures = []
|
||||
found_most_recent_failure = False
|
||||
|
||||
for job in self.filtered_statuses:
|
||||
if is_job_failed(job):
|
||||
failures.append(job)
|
||||
found_most_recent_failure = True
|
||||
if found_most_recent_failure and not is_job_failed(job):
|
||||
break
|
||||
|
||||
return failures
|
||||
|
||||
def should_alert(self) -> bool:
|
||||
# Group jobs by their failures. The length of the failure chain is used
|
||||
# to raise the alert, so we can do a simple tweak here to use the length
|
||||
# of the longest unique chain
|
||||
unique_failures = self.get_unique_failures(self.failure_chain)
|
||||
|
||||
return (
|
||||
self.current_status is not None
|
||||
and self.current_status["conclusion"] != SUCCESS
|
||||
and any(
|
||||
len(failure_chain) >= FAILURE_CHAIN_THRESHOLD
|
||||
for failure_chain in unique_failures.values()
|
||||
)
|
||||
and all(
|
||||
disabled_alert not in self.job_name
|
||||
for disabled_alert in DISABLED_ALERTS
|
||||
)
|
||||
)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"jobName: {self.job_name}"
|
||||
|
||||
|
||||
def fetch_hud_data(repo: str, branch: str) -> Any:
|
||||
response = requests.get(f"https://hud.pytorch.org/api/hud/{repo}/{branch}/0")
|
||||
response.raise_for_status()
|
||||
hud_data = json.loads(response.text)
|
||||
return (hud_data["jobNames"], hud_data["shaGrid"])
|
||||
|
||||
|
||||
# Creates a Dict of Job Name -> [JobData]. Essentially a Column in HUD
|
||||
def map_job_data(jobNames: Any, shaGrid: Any) -> Dict[str, Any]:
|
||||
jobData = defaultdict(list)
|
||||
for sha in shaGrid:
|
||||
for ind, job in enumerate(sha["jobs"]):
|
||||
jobData[jobNames[ind]].append(job)
|
||||
return jobData
|
||||
|
||||
|
||||
def is_job_failed(job: Any) -> bool:
|
||||
conclusion = job["conclusion"] if "conclusion" in job else None
|
||||
return conclusion is not None and conclusion != SUCCESS and conclusion != PENDING
|
||||
|
||||
|
||||
def is_job_skipped(job: Any) -> bool:
|
||||
conclusion = job["conclusion"] if "conclusion" in job else None
|
||||
return conclusion in (NEUTRAL, SKIPPED) or conclusion is None
|
||||
|
||||
|
||||
def get_failed_jobs(job_data: List[Any]) -> List[Any]:
|
||||
return [job for job in job_data if job["conclusion"] == "failure"]
|
||||
|
||||
|
||||
def classify_jobs(
|
||||
all_job_names: List[str], sha_grid: Any, filtered_jobs_names: Set[str]
|
||||
) -> Tuple[List[JobStatus], List[Any]]:
|
||||
"""
|
||||
Creates Job Statuses which has the logic for if need to alert or if there's flaky jobs.
|
||||
Classifies jobs into jobs to alert on and flaky jobs.
|
||||
:param all_job_names: list of all job names as returned by the HUD
|
||||
:param sha_grid: list of all job data as returned by the HUD (parallel index to all_job_names)
|
||||
:param filtered_jobs_names: set of job names to actually consider
|
||||
:return:
|
||||
"""
|
||||
print(sha_grid[:1])
|
||||
job_data = map_job_data(all_job_names, sha_grid)
|
||||
job_statuses: List[JobStatus] = []
|
||||
for job in job_data:
|
||||
job_statuses.append(JobStatus(job, job_data[job]))
|
||||
|
||||
jobs_to_alert_on = []
|
||||
flaky_jobs = []
|
||||
|
||||
for job_status in job_statuses:
|
||||
if job_status.job_name not in filtered_jobs_names:
|
||||
continue
|
||||
if job_status.should_alert():
|
||||
jobs_to_alert_on.append(job_status)
|
||||
flaky_jobs.extend(job_status.flaky_jobs)
|
||||
|
||||
return jobs_to_alert_on, flaky_jobs
|
||||
|
||||
|
||||
# filter job names that don't match the regex
|
||||
def filter_job_names(job_names: List[str], job_name_regex: str) -> List[str]:
|
||||
if job_name_regex:
|
||||
return [
|
||||
job_name for job_name in job_names if re.match(job_name_regex, job_name)
|
||||
]
|
||||
return job_names
|
||||
|
||||
|
||||
def get_recurrently_failing_jobs_alerts(
|
||||
repo: str, branch: str, job_name_regex: str
|
||||
) -> List[Dict[str, Any]]:
|
||||
job_names, sha_grid = fetch_hud_data(repo=repo, branch=branch)
|
||||
|
||||
filtered_job_names = set(filter_job_names(job_names, job_name_regex))
|
||||
if job_name_regex:
|
||||
print()
|
||||
print(f"Filtered to {len(filtered_job_names)} jobs:")
|
||||
if len(filtered_job_names) == 0:
|
||||
print("No jobs matched the regex")
|
||||
elif len(filtered_job_names) == len(job_names):
|
||||
print("All jobs matched the regex")
|
||||
else:
|
||||
print("\n".join(filtered_job_names))
|
||||
|
||||
(recurrently_failing_jobs, flaky_jobs) = classify_jobs(
|
||||
job_names, sha_grid, filtered_job_names
|
||||
)
|
||||
|
||||
alerts = []
|
||||
for job in recurrently_failing_jobs:
|
||||
entry = {
|
||||
"AlertType": "Recurrently Failing Job",
|
||||
"AlertObject": job.__repr__(),
|
||||
"OncallTeams": "",
|
||||
"OncallIndividuals": "",
|
||||
"Flags": "",
|
||||
}
|
||||
alerts.append(entry)
|
||||
return alerts
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--repo",
|
||||
help="Repository to do checks for",
|
||||
type=str,
|
||||
default=os.getenv("REPO_TO_CHECK", "pytorch/pytorch"),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--branch",
|
||||
help="Branch to do checks for",
|
||||
type=str,
|
||||
default=os.getenv("BRANCH_TO_CHECK", "main"),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--job-name-regex",
|
||||
help="Consider only job names matching given regex (if omitted, all jobs are matched)",
|
||||
type=str,
|
||||
default=os.getenv("JOB_NAME_REGEX", ""),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--with-flaky-test-alert",
|
||||
help="Run this script with the flaky test alerting",
|
||||
type=distutils.util.strtobool,
|
||||
default=os.getenv("WITH_FLAKY_TEST_ALERT", "YES"),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
help="Whether or not to actually post issues",
|
||||
type=distutils.util.strtobool,
|
||||
default=os.getenv("DRY_RUN", "YES"),
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
args = parse_args()
|
||||
print(
|
||||
json.dumps(
|
||||
get_recurrently_failing_jobs_alerts(
|
||||
args.repo, args.branch, args.job_name_regex
|
||||
)
|
||||
)
|
||||
)
|
73
tools/test/test_create_alerts.py
Normal file
73
tools/test/test_create_alerts.py
Normal file
@ -0,0 +1,73 @@
|
||||
from typing import Any, List
|
||||
from unittest import main, TestCase
|
||||
|
||||
from tools.alerts.create_alerts import filter_job_names, JobStatus
|
||||
|
||||
|
||||
JOB_NAME = "periodic / linux-xenial-cuda10.2-py3-gcc7-slow-gradcheck / test (default, 2, 2, linux.4xlarge.nvidia.gpu)"
|
||||
MOCK_TEST_DATA = [
|
||||
{
|
||||
"sha": "f02f3046571d21b48af3067e308a1e0f29b43af9",
|
||||
"id": 7819529276,
|
||||
"conclusion": "failure",
|
||||
"htmlUrl": "https://github.com/pytorch/pytorch/runs/7819529276?check_suite_focus=true",
|
||||
"logUrl": "https://ossci-raw-job-status.s3.amazonaws.com/log/7819529276",
|
||||
"durationS": 14876,
|
||||
"failureLine": "##[error]The action has timed out.",
|
||||
"failureContext": "",
|
||||
"failureCaptures": ["##[error]The action has timed out."],
|
||||
"failureLineNumber": 83818,
|
||||
"repo": "pytorch/pytorch",
|
||||
},
|
||||
{
|
||||
"sha": "d0d6b1f2222bf90f478796d84a525869898f55b6",
|
||||
"id": 7818399623,
|
||||
"conclusion": "failure",
|
||||
"htmlUrl": "https://github.com/pytorch/pytorch/runs/7818399623?check_suite_focus=true",
|
||||
"logUrl": "https://ossci-raw-job-status.s3.amazonaws.com/log/7818399623",
|
||||
"durationS": 14882,
|
||||
"failureLine": "##[error]The action has timed out.",
|
||||
"failureContext": "",
|
||||
"failureCaptures": ["##[error]The action has timed out."],
|
||||
"failureLineNumber": 72821,
|
||||
"repo": "pytorch/pytorch",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
class TestGitHubPR(TestCase):
|
||||
# Should fail when jobs are ? ? Fail Fail
|
||||
def test_alert(self) -> None:
|
||||
modified_data: List[Any] = [{}]
|
||||
modified_data.append({})
|
||||
modified_data.extend(MOCK_TEST_DATA)
|
||||
status = JobStatus(JOB_NAME, modified_data)
|
||||
self.assertTrue(status.should_alert())
|
||||
|
||||
# test filter job names
|
||||
def test_job_filter(self) -> None:
|
||||
job_names = [
|
||||
"pytorch_linux_xenial_py3_6_gcc5_4_test",
|
||||
"pytorch_linux_xenial_py3_6_gcc5_4_test2",
|
||||
]
|
||||
self.assertListEqual(
|
||||
filter_job_names(job_names, ""),
|
||||
job_names,
|
||||
"empty regex should match all jobs",
|
||||
)
|
||||
self.assertListEqual(filter_job_names(job_names, ".*"), job_names)
|
||||
self.assertListEqual(filter_job_names(job_names, ".*xenial.*"), job_names)
|
||||
self.assertListEqual(
|
||||
filter_job_names(job_names, ".*xenial.*test2"),
|
||||
["pytorch_linux_xenial_py3_6_gcc5_4_test2"],
|
||||
)
|
||||
self.assertListEqual(filter_job_names(job_names, ".*xenial.*test3"), [])
|
||||
self.assertRaises(
|
||||
Exception,
|
||||
lambda: filter_job_names(job_names, "["),
|
||||
msg="malformed regex should throw exception",
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Reference in New Issue
Block a user