Compare commits

...

13 Commits

Author SHA1 Message Date
5a138566e0 tc 2025-10-21 10:55:34 -07:00
852824203a tc 2025-10-20 14:46:17 -07:00
29a32c3e8c tc 2025-10-20 11:27:47 -07:00
ffef93522f tc 2025-10-19 22:36:55 -07:00
a046318b94 tc 2025-10-19 20:57:45 -07:00
318a36ed3e tc 2025-10-19 20:48:47 -07:00
d6ea618258 tc 2025-10-19 20:23:22 -07:00
815c62c515 tc 2025-10-19 18:15:48 -07:00
73157ebe76 move code around 2025-10-17 15:50:03 -07:00
8d6f573c43 fix indexing when shuffling 2025-10-17 15:23:34 -07:00
ea20910cbf job info stuff 2025-10-17 15:16:12 -07:00
b4541ea800 tc 2025-10-17 14:37:37 -07:00
870c71cf60 tc 2025-10-17 14:08:25 -07:00
7 changed files with 280 additions and 47 deletions

View File

@ -71,9 +71,11 @@ jobs:
JOB_NAME: ${{ steps.get-job-id.outputs.job-name }}
PR_NUMBER: ${{ github.event.pull_request.number }}
run: |
set -euox pipefail
unzip -o .additional_ci_files/llm_results/mappings.zip -d .additional_ci_files/llm_results || true
python3 -m pip install boto3==1.35.42
python3 tools/testing/do_target_determination_for_s3.py
python3 tools/testing/do_target_determination_for_s3.py \
--workflow-ref "${{ github.workflow_ref }}" \
- name: Upload TD results to s3
uses: seemethere/upload-artifact-s3@baba72d0712b404f646cebe0730933554ebce96a # v5.1.0

View File

@ -1993,10 +1993,12 @@ def main():
test_directory = str(REPO_ROOT / "test")
selected_tests = get_selected_tests(options)
test_prioritizations = import_results()
if len(test_prioritizations.get_all_tests()) == 0:
testsToRun = import_results(
f"{os.environ.get('JOB_NAME', '')}|{os.environ.get('TEST_CONFIG', '')}"
)
if len(testsToRun.included) == 0:
options.enable_td = False
test_prioritizations.amend_tests(selected_tests)
testsToRun.amend_tests(selected_tests)
os.makedirs(REPO_ROOT / "test" / "test-reports", exist_ok=True)
@ -2040,13 +2042,10 @@ def main():
s += "".join(f" {test}\n" for test in parallel)
return s.strip()
percent_to_run = 25 if options.enable_td else 100
print_to_stderr(
f"Running {percent_to_run}% of tests based on TD"
if options.enable_td
else "Running all tests"
)
include, exclude = test_prioritizations.get_top_per_tests(percent_to_run)
include, exclude = testsToRun.included, testsToRun.excluded
if not options.enable_td:
print("TD based test selection is disabled")
include = include + exclude
test_batch = TestBatch("tests to run", include, False)
test_batch_exclude = TestBatch("excluded", exclude, True)
@ -2097,14 +2096,12 @@ def main():
if IS_CI:
for test, _ in all_failures:
test_stats = test_prioritizations.get_test_stats(test)
print_to_stderr("Emiting td_test_failure_stats_v2")
emit_metric(
"td_test_failure_stats_v2",
{
"selected_tests": selected_tests,
"failure": str(test),
**test_stats,
},
)
gen_additional_test_failures_file(

View File

@ -1,3 +1,4 @@
import argparse
import json
import os
import sys
@ -19,28 +20,45 @@ from tools.stats.import_test_stats import (
)
from tools.stats.upload_metrics import emit_metric
from tools.testing.discover_tests import TESTS
from tools.testing.target_determination.determinator import (
from tools.testing.target_determination import (
AggregatedHeuristics,
get_job_info_from_workflow_file,
get_test_prioritizations,
TestPrioritizations,
TestsToRun,
)
sys.path.remove(str(REPO_ROOT))
def import_results() -> TestPrioritizations:
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Run target determination with job info"
)
parser.add_argument(
"--workflow-ref",
type=str,
default="",
help="Path to the GitHub workflow file to parse, this should correspond to github.workflow_ref in the github context",
)
args = parser.parse_args()
return args
def import_results(job_name: str) -> TestsToRun:
if not (REPO_ROOT / ".additional_ci_files/td_results.json").exists():
print("No TD results found")
return TestPrioritizations([], {})
return TestsToRun([], [])
with open(REPO_ROOT / ".additional_ci_files/td_results.json") as f:
td_results = json.load(f)
tp = TestPrioritizations.from_json(td_results)
return tp
res = {k: TestsToRun.from_json(v) for k, v in td_results.items()}
if job_name not in res:
print(f"Job name {job_name} not found in TD results, using default")
return res.get(job_name, res.get("default", TestsToRun([], [])))
def main() -> None:
args = parse_args()
selected_tests = TESTS
aggregated_heuristics: AggregatedHeuristics = AggregatedHeuristics(selected_tests)
@ -54,10 +72,22 @@ def main() -> None:
copy_pytest_cache()
copy_additional_previous_failures()
job_info = get_job_info_from_workflow_file(args.workflow_ref)
print(f"Job info: {json.dumps(job_info, indent=2)}")
aggregated_heuristics = get_test_prioritizations(selected_tests)
test_prioritizations = aggregated_heuristics.get_aggregated_priorities()
recommended_cutoffs_per_job = test_prioritizations.get_recommended_cutoffs(job_info)
json_serialized_cutoffs = {
k: v.to_json() for k, v in recommended_cutoffs_per_job.items()
}
print("Recommended Cutoffs Per Job:")
print(json.dumps(json_serialized_cutoffs, indent=2))
print("Aggregated Heuristics")
print(test_prioritizations.get_info_str(verbose=False))
@ -74,7 +104,7 @@ def main() -> None:
)
with open(REPO_ROOT / "td_results.json", "w") as f:
f.write(json.dumps(test_prioritizations.to_json()))
f.write(json.dumps(json_serialized_cutoffs, indent=2))
if __name__ == "__main__":

View File

@ -0,0 +1,11 @@
from tools.testing.target_determination.determinator import (
get_test_prioritizations as get_test_prioritizations,
)
from tools.testing.target_determination.do_td_with_job_info import (
get_job_info_from_workflow_file as get_job_info_from_workflow_file,
)
from tools.testing.target_determination.heuristics import (
AggregatedHeuristics as AggregatedHeuristics,
TestPrioritizations as TestPrioritizations,
TestsToRun as TestsToRun,
)

View File

@ -0,0 +1,123 @@
import re
from pathlib import Path
from typing import Any
HAS_PYYAML = True
try:
import yaml
except ImportError:
print("Please install pyyaml to use target determination features.")
HAS_PYYAML = False
REPO_ROOT = Path(__file__).resolve().parents[3]
def get_job_info_from_workflow_file(workflow_file: str) -> list[list[dict[str, Any]]]:
"""
Returns groups of jobs that are similar based on the test configurations
they run.
This is pretty hardcoded, so it is fragile, but it returns a pretty accurate
mapping
TODO replace with better (automated?) system. Maybe a separate workflow that
generates an artifact that says which jobs are similar according what tests
they run, correlation etc, also looks at jobs on main branch or merge base
to better determine what jobs exist.
"""
if not HAS_PYYAML:
return []
# Usually takes the form
# pytorch/pytorch/.github/workflows/pull.yml@refs/pull/165793/merge in CI?
workflow_file = workflow_file.split("@")[0].split(".github/workflows/")
workflow_file = ".github/workflows/" + workflow_file[1]
regex = r"needs\.([a-zA-Z0-9_-]+)\.outputs\.test-matrix"
with open(REPO_ROOT / workflow_file) as f:
yml = yaml.safe_load(f)
raw_jobs = yml.get("jobs", {})
jobs: list[dict[str, Any]] = []
dependent_jobs = {}
for job, job_info in raw_jobs.items():
if "test-matrix" not in job_info.get("with", {}):
continue
try:
test_matrix = yaml.safe_load(job_info["with"]["test-matrix"])
if "include" not in test_matrix:
# ${{ needs.linux-jammy-cuda12_8-py3_10-gcc11-sm86-build.outputs.test-matrix }}
match = re.search(regex, test_matrix)
if match:
dep_job = match.group(1)
dependent_jobs[f"{job_info.get('name', job)}"] = {
"depends_on": f"{dep_job}",
"uses": job_info.get("uses", ""),
}
continue
except yaml.YAMLError as e:
print(f"Error parsing test-matrix for job {job}: {e}")
continue
jobs.append(
{
"job_id": f"{job}",
"job_name": f"{job_info.get('name', job)}",
"test_matrix": sorted(
{entry["config"] for entry in test_matrix["include"]}
),
"uses": job_info.get("uses", ""),
}
)
# Fill in dependent jobs
for job, info in dependent_jobs.items():
for j in jobs:
if j["job_id"] == info["depends_on"]:
jobs.append(
{
"job_id": job,
"job_name": job,
"test_matrix": j["test_matrix"],
"uses": info["uses"],
}
)
break
# Remove everything that doesn't use test
jobs = [j for j in jobs if "test" in j["uses"]]
individual_jobs = [
{"job_name": j["job_name"], "config": config, "uses": j.get("uses", "")}
for j in jobs
for config in j["test_matrix"]
]
# Group the jobs together
# generally same test config -> same group
grouped_jobs: dict[str, list[dict[str, Any]]] = {}
for job in individual_jobs:
key = []
if "onnx" in job["job_name"]:
key.append("onnx")
if "bazel" in job["job_name"]:
key.append("bazel")
if "cuda" in job["job_name"]:
key.append("cuda")
if "mac" in job["job_name"]:
key.append("mac")
if "windows" in job["job_name"]:
key.append("windows")
key.append(job["config"])
key.append(job["uses"])
key_str = "|".join(sorted(key))
if key_str not in grouped_jobs:
grouped_jobs[key_str] = []
grouped_jobs[key_str].append(job)
for group in grouped_jobs.values():
for j in group:
j.pop("uses", None)
return list(grouped_jobs.values())

View File

@ -16,6 +16,7 @@ from tools.testing.target_determination.heuristics.historical_edited_files impor
from tools.testing.target_determination.heuristics.interface import (
AggregatedHeuristics as AggregatedHeuristics,
TestPrioritizations as TestPrioritizations,
TestsToRun as TestsToRun,
)
from tools.testing.target_determination.heuristics.llm import LLM
from tools.testing.target_determination.heuristics.mentioned_in_pr import MentionedInPR

View File

@ -11,6 +11,44 @@ if TYPE_CHECKING:
from collections.abc import Iterable, Iterator
class TestsToRun:
"""
Describes which tests to include and exclude.
"""
included: list[TestRun]
excluded: list[TestRun]
def __init__(self, included: list[TestRun], excluded: list[TestRun]) -> None:
self.included = included
self.excluded = excluded
@staticmethod
def from_json(json_dict: dict[str, Any]) -> TestsToRun:
return TestsToRun(
included=[TestRun.from_json(tr_json) for tr_json in json_dict["included"]],
excluded=[TestRun.from_json(tr_json) for tr_json in json_dict["excluded"]],
)
def to_json(self) -> dict[str, Any]:
return {
"included": [tr.to_json() for tr in self.included],
"excluded": [tr.to_json() for tr in self.excluded],
}
def amend_tests(self, tests: list[str]) -> None:
"""
Removes unknown tests and adds any missing tests
"""
self.included = [tr for tr in self.included if tr.test_file in tests]
self.excluded = [tr for tr in self.excluded if tr.test_file in tests]
self.included = [
TestRun(test)
for test in tests
if test not in [tr.test_file for tr in self.included + self.excluded]
] + self.included
class TestPrioritizations:
"""
Describes the results of whether heuristics consider a test relevant or not.
@ -116,12 +154,62 @@ class TestPrioritizations:
"""Returns all tests in the TestPrioritizations"""
return [x[1] for x in self._traverse_scores()]
def get_top_per_tests(self, n: int) -> tuple[list[TestRun], list[TestRun]]:
"""Divides list of tests into two based on the top n% of scores. The
first list is the top, and the second is the rest."""
tests = [x[1] for x in self._traverse_scores()]
index = n * len(tests) // 100 + 1
return tests[:index], tests[index:]
def shuffle_tests_among_jobs(self, total_jobs: int) -> list[list[TestRun]]:
tests = self.get_all_tests()
jobs: list[list[TestRun]] = [[] for _ in range(total_jobs)]
top_10_percent_index = len(tests) // 10 + 1
top_tests = tests[:top_10_percent_index]
rest_tests = tests[top_10_percent_index:]
# Round robin distribute the rest of the tests among jobs
for i in range(len(rest_tests)):
jobs[i % total_jobs].append(rest_tests[i])
# Now add all jobs to each other so they all get everything, but rotated
jobs_rotated: list[list[TestRun]] = [[] for _ in range(total_jobs)]
for job_index in range(total_jobs):
for offset in range(total_jobs):
jobs_rotated[job_index].extend(jobs[(job_index + offset) % total_jobs])
# Now add the top tests to each job at the front
all_jobs = []
for job_index in range(total_jobs):
tests_for_job = top_tests + jobs_rotated[job_index]
assert len(tests_for_job) == len(tests)
assert set(tests_for_job) == set(tests)
all_jobs.append(tests_for_job)
return all_jobs
def get_recommended_cutoffs(
self, job_info: list[list[dict[str, Any]]]
) -> dict[str, TestsToRun]:
"""
Given job info from the workflow file, returns a dict mapping job names to
TestsToRun objects that describe which tests to include and exclude.
"""
cutoffs: dict[str, TestsToRun] = {}
cutoff_percent = 0.3
cutoff_index = int(len(self._test_scores) * cutoff_percent) + 1
for job_group in job_info:
jobs_for_tests = self.shuffle_tests_among_jobs(len(job_group))
for i, job in enumerate(job_group):
job_name = job["job_name"]
test_config = job["config"]
tests_for_job = jobs_for_tests[i]
cutoffs[f"{job_name}|{test_config}"] = TestsToRun(
included=tests_for_job[:cutoff_index],
excluded=tests_for_job[cutoff_index:],
)
all_tests = self.get_all_tests()
cutoffs["default"] = TestsToRun(
included=all_tests[:cutoff_index],
excluded=all_tests[cutoff_index:],
)
return cutoffs
def get_info_str(self, verbose: bool = True) -> str:
info = ""
@ -186,26 +274,6 @@ class TestPrioritizations:
)
return test_prioritizations
def amend_tests(self, tests: list[str]) -> None:
"""
Removes tests that are not in the given list from the
TestPrioritizations. Adds tests that are in the list but not in the
TestPrioritizations.
"""
valid_scores = {
test: score
for test, score in self._test_scores.items()
if test.test_file in tests
}
self._test_scores = valid_scores
for test in tests:
if test not in self._original_tests:
self._test_scores[TestRun(test)] = 0
self._original_tests = frozenset(tests)
self.validate()
class AggregatedHeuristics:
"""
@ -298,6 +366,7 @@ class AggregatedHeuristics:
json_dict: dict[str, Any] = {}
for heuristic, heuristic_results in self._heuristic_results.items():
json_dict[heuristic.name] = heuristic_results.to_json()
del json_dict[heuristic.name]["_original_tests"] # Avoid duplication
return json_dict