[CI] test upload: better check for if job is rerun disabled tests (#148027)

Some disabled test runs weren't being uploaded as disabled tests because some dynamo tests are set to mark themselves as skipped if they are failing.  This makes the script think that there are fewer retries than there are actually are and that the job is not a rerun disabled tests job.  Instead, query for the job name to see if it contains rerun disabled tests and fall back to counting the number of retries if querying fails

Alternate options: relax the check for the number of tests
Pull Request resolved: https://github.com/pytorch/pytorch/pull/148027
Approved by: https://github.com/huydhn
This commit is contained in:
Catherine Lee
2025-02-28 00:04:33 +00:00
committed by PyTorch MergeBot
parent fc78192b1d
commit 2978771c9d
2 changed files with 59 additions and 5 deletions

View File

@ -9,7 +9,7 @@ import time
import zipfile
from functools import lru_cache
from pathlib import Path
from typing import Any, Callable, Optional
from typing import Any, Callable, cast, Optional
import boto3 # type: ignore[import]
import requests
@ -245,15 +245,24 @@ def unzip(p: Path) -> None:
zip.extractall(unzipped_dir)
def is_rerun_disabled_tests(tests: dict[str, dict[str, int]]) -> bool:
def is_rerun_disabled_tests(
report: Path,
workflow_run_id: int,
workflow_run_attempt: int,
tests: dict[str, dict[str, int]],
) -> bool:
"""
Check if the test report is coming from rerun_disabled_tests workflow where
each test is run multiple times
"""
return all(
if all(
t.get("num_green", 0) + t.get("num_red", 0) > MAX_RETRY_IN_NON_DISABLED_MODE
for t in tests.values()
)
):
return True
job_id = get_job_id(report)
job_name = get_job_name(job_id, workflow_run_id, workflow_run_attempt)
return job_name is not None and "rerun_disabled_tests" in job_name
def get_job_id(report: Path) -> int | None:
@ -266,3 +275,46 @@ def get_job_id(report: Path) -> int | None:
return int(report.parts[0].rpartition("_")[2])
except ValueError:
return None
@lru_cache
def get_job_name(
id: int | None, workflow_id: int | None, workflow_run_attempt: int | None
) -> str | None:
if id is None:
return None
try:
if workflow_id is None:
response = requests.get(
f"{PYTORCH_REPO}/actions/jobs/{id}",
headers=_get_request_headers(),
)
if response.status_code != 200:
return None
return cast(str, response.json()["name"])
else:
@lru_cache
def _get_jobs(workflow_id: int) -> dict[int, str]:
jobs: dict[int, str] = {}
# Paginate
page = 1
while True:
response = requests.get(
f"{PYTORCH_REPO}/actions/runs/{workflow_id}/attempts/{workflow_run_attempt}/jobs",
headers=_get_request_headers(),
params={"page": page, "per_page": 100},
)
if response.status_code != 200:
return jobs
for job in response.json()["jobs"]:
jobs[job["id"]] = job["name"]
if "next" not in response.links:
break
page += 1
return jobs
jobs = _get_jobs(workflow_id)
return jobs[id]
except Exception:
return None