mirror of
https://github.com/pytorch/pytorch.git
synced 2025-10-20 21:14:14 +08:00
This reverts commit 7dfab082be9eaeeee95c7b0363e59c824c6a9009. Reverted https://github.com/pytorch/pytorch/pull/106347 on behalf of https://github.com/clee2000 due to probably broke sharding ([comment](https://github.com/pytorch/pytorch/pull/106347#issuecomment-1675542738))
342 lines
11 KiB
Python
342 lines
11 KiB
Python
import datetime
|
|
import gzip
|
|
import inspect
|
|
import io
|
|
import json
|
|
import os
|
|
import time
|
|
import uuid
|
|
import zipfile
|
|
|
|
from decimal import Decimal
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List
|
|
from warnings import warn
|
|
|
|
import boto3 # type: ignore[import]
|
|
import requests
|
|
import rockset # type: ignore[import]
|
|
|
|
PYTORCH_REPO = "https://api.github.com/repos/pytorch/pytorch"
|
|
S3_RESOURCE = boto3.resource("s3")
|
|
|
|
# NB: In CI, a flaky test is usually retried 3 times, then the test file would be rerun
|
|
# 2 more times
|
|
MAX_RETRY_IN_NON_DISABLED_MODE = 3 * 3
|
|
|
|
|
|
def _get_request_headers() -> Dict[str, str]:
|
|
return {
|
|
"Accept": "application/vnd.github.v3+json",
|
|
"Authorization": "token " + os.environ["GITHUB_TOKEN"],
|
|
}
|
|
|
|
|
|
def _get_artifact_urls(prefix: str, workflow_run_id: int) -> Dict[Path, str]:
|
|
"""Get all workflow artifacts with 'test-report' in the name."""
|
|
response = requests.get(
|
|
f"{PYTORCH_REPO}/actions/runs/{workflow_run_id}/artifacts?per_page=100",
|
|
)
|
|
artifacts = response.json()["artifacts"]
|
|
while "next" in response.links.keys():
|
|
response = requests.get(
|
|
response.links["next"]["url"], headers=_get_request_headers()
|
|
)
|
|
artifacts.extend(response.json()["artifacts"])
|
|
|
|
artifact_urls = {}
|
|
for artifact in artifacts:
|
|
if artifact["name"].startswith(prefix):
|
|
artifact_urls[Path(artifact["name"])] = artifact["archive_download_url"]
|
|
return artifact_urls
|
|
|
|
|
|
def _download_artifact(
|
|
artifact_name: Path, artifact_url: str, workflow_run_attempt: int
|
|
) -> Path:
|
|
# [Artifact run attempt]
|
|
# All artifacts on a workflow share a single namespace. However, we can
|
|
# re-run a workflow and produce a new set of artifacts. To avoid name
|
|
# collisions, we add `-runattempt1<run #>-` somewhere in the artifact name.
|
|
#
|
|
# This code parses out the run attempt number from the artifact name. If it
|
|
# doesn't match the one specified on the command line, skip it.
|
|
atoms = str(artifact_name).split("-")
|
|
for atom in atoms:
|
|
if atom.startswith("runattempt"):
|
|
found_run_attempt = int(atom[len("runattempt") :])
|
|
if workflow_run_attempt != found_run_attempt:
|
|
print(
|
|
f"Skipping {artifact_name} as it is an invalid run attempt. "
|
|
f"Expected {workflow_run_attempt}, found {found_run_attempt}."
|
|
)
|
|
|
|
print(f"Downloading {artifact_name}")
|
|
|
|
response = requests.get(artifact_url, headers=_get_request_headers())
|
|
with open(artifact_name, "wb") as f:
|
|
f.write(response.content)
|
|
return artifact_name
|
|
|
|
|
|
def download_s3_artifacts(
|
|
prefix: str, workflow_run_id: int, workflow_run_attempt: int
|
|
) -> List[Path]:
|
|
bucket = S3_RESOURCE.Bucket("gha-artifacts")
|
|
objs = bucket.objects.filter(
|
|
Prefix=f"pytorch/pytorch/{workflow_run_id}/{workflow_run_attempt}/artifact/{prefix}"
|
|
)
|
|
|
|
found_one = False
|
|
paths = []
|
|
for obj in objs:
|
|
found_one = True
|
|
p = Path(Path(obj.key).name)
|
|
print(f"Downloading {p}")
|
|
with open(p, "wb") as f:
|
|
f.write(obj.get()["Body"].read())
|
|
paths.append(p)
|
|
|
|
if not found_one:
|
|
print(
|
|
"::warning title=s3 artifacts not found::"
|
|
"Didn't find any test reports in s3, there might be a bug!"
|
|
)
|
|
return paths
|
|
|
|
|
|
def download_gha_artifacts(
|
|
prefix: str, workflow_run_id: int, workflow_run_attempt: int
|
|
) -> List[Path]:
|
|
artifact_urls = _get_artifact_urls(prefix, workflow_run_id)
|
|
paths = []
|
|
for name, url in artifact_urls.items():
|
|
paths.append(_download_artifact(Path(name), url, workflow_run_attempt))
|
|
return paths
|
|
|
|
|
|
def upload_to_rockset(
|
|
collection: str, docs: List[Any], workspace: str = "commons"
|
|
) -> None:
|
|
print(f"Writing {len(docs)} documents to Rockset")
|
|
client = rockset.RocksetClient(
|
|
host="api.usw2a1.rockset.com", api_key=os.environ["ROCKSET_API_KEY"]
|
|
)
|
|
client.Documents.add_documents(
|
|
collection=collection,
|
|
data=docs,
|
|
workspace=workspace,
|
|
)
|
|
print("Done!")
|
|
|
|
|
|
def upload_to_s3(
|
|
bucket_name: str,
|
|
key: str,
|
|
docs: List[Dict[str, Any]],
|
|
) -> None:
|
|
print(f"Writing {len(docs)} documents to S3")
|
|
body = io.StringIO()
|
|
for doc in docs:
|
|
json.dump(doc, body)
|
|
body.write("\n")
|
|
|
|
S3_RESOURCE.Object(
|
|
f"{bucket_name}",
|
|
f"{key}",
|
|
).put(
|
|
Body=gzip.compress(body.getvalue().encode()),
|
|
ContentEncoding="gzip",
|
|
ContentType="application/json",
|
|
)
|
|
print("Done!")
|
|
|
|
|
|
def read_from_s3(
|
|
bucket_name: str,
|
|
key: str,
|
|
) -> List[Dict[str, Any]]:
|
|
print(f"Reading from s3://{bucket_name}/{key}")
|
|
body = (
|
|
S3_RESOURCE.Object(
|
|
f"{bucket_name}",
|
|
f"{key}",
|
|
)
|
|
.get()["Body"]
|
|
.read()
|
|
)
|
|
results = gzip.decompress(body).decode().split("\n")
|
|
return [json.loads(result) for result in results if result]
|
|
|
|
|
|
def upload_workflow_stats_to_s3(
|
|
workflow_run_id: int,
|
|
workflow_run_attempt: int,
|
|
collection: str,
|
|
docs: List[Dict[str, Any]],
|
|
) -> None:
|
|
bucket_name = "ossci-raw-job-status"
|
|
key = f"{collection}/{workflow_run_id}/{workflow_run_attempt}"
|
|
upload_to_s3(bucket_name, key, docs)
|
|
|
|
|
|
def upload_file_to_s3(
|
|
file_name: str,
|
|
bucket: str,
|
|
key: str,
|
|
) -> None:
|
|
"""
|
|
Upload a local file to S3
|
|
"""
|
|
print(f"Upload {file_name} to s3://{bucket}/{key}")
|
|
boto3.client("s3").upload_file(
|
|
file_name,
|
|
bucket,
|
|
key,
|
|
)
|
|
|
|
|
|
def unzip(p: Path) -> None:
|
|
"""Unzip the provided zipfile to a similarly-named directory.
|
|
|
|
Returns None if `p` is not a zipfile.
|
|
|
|
Looks like: /tmp/test-reports.zip -> /tmp/unzipped-test-reports/
|
|
"""
|
|
assert p.is_file()
|
|
unzipped_dir = p.with_name("unzipped-" + p.stem)
|
|
print(f"Extracting {p} to {unzipped_dir}")
|
|
|
|
with zipfile.ZipFile(p, "r") as zip:
|
|
zip.extractall(unzipped_dir)
|
|
|
|
|
|
def is_rerun_disabled_tests(tests: Dict[str, Dict[str, int]]) -> bool:
|
|
"""
|
|
Check if the test report is coming from rerun_disabled_tests workflow where
|
|
each test is run multiple times
|
|
"""
|
|
return all(
|
|
t.get("num_green", 0) + t.get("num_red", 0) > MAX_RETRY_IN_NON_DISABLED_MODE
|
|
for t in tests.values()
|
|
)
|
|
|
|
|
|
def _convert_float_values_to_decimals(data: Dict[str, Any]) -> Dict[str, Any]:
|
|
return {k: Decimal(str(v)) if isinstance(v, float) else v for k, v in data.items()}
|
|
|
|
|
|
class EnvVarMetric:
|
|
name: str
|
|
env_var: str
|
|
required: bool = True
|
|
# Used to cast the value of the env_var to the correct type (defaults to str)
|
|
type_conversion_fn: Any = None
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
env_var: str,
|
|
required: bool = True,
|
|
type_conversion_fn: Any = None,
|
|
) -> None:
|
|
self.name = name
|
|
self.env_var = env_var
|
|
self.required = required
|
|
self.type_conversion_fn = type_conversion_fn
|
|
|
|
def value(self) -> Any:
|
|
value = os.environ.get(self.env_var)
|
|
if value is None and self.required:
|
|
raise ValueError(
|
|
f"Missing {self.name}. Please set the {self.env_var}"
|
|
"environment variable to pass in this value."
|
|
)
|
|
if self.type_conversion_fn:
|
|
return self.type_conversion_fn(value)
|
|
return value
|
|
|
|
|
|
def emit_metric(
|
|
metric_name: str,
|
|
metrics: Dict[str, Any],
|
|
) -> None:
|
|
"""
|
|
Upload a metric to DynamoDB (and from there, Rockset).
|
|
|
|
Parameters:
|
|
metric_name:
|
|
Name of the metric. Every unique metric should have a different name
|
|
and be emitted just once per run attempt.
|
|
Metrics are namespaced by their module and the function that emitted them.
|
|
metrics: The actual data to record.
|
|
|
|
Some default values are populated from environment variables, which must be set
|
|
for metrics to be emitted. (If they're not set, this function becomes a noop):
|
|
"""
|
|
|
|
if metrics is None:
|
|
raise ValueError("You didn't ask to upload any metrics!")
|
|
|
|
# We use these env vars that to determine basic info about the workflow run.
|
|
# By using env vars, we don't have to pass this info around to every function.
|
|
# It also helps ensure that we only emit metrics during CI
|
|
env_var_metrics = [
|
|
EnvVarMetric("repo", "GITHUB_REPOSITORY"),
|
|
EnvVarMetric("workflow", "GITHUB_WORKFLOW"),
|
|
EnvVarMetric("build_environment", "BUILD_ENVIRONMENT"),
|
|
EnvVarMetric("job", "GITHUB_JOB"),
|
|
EnvVarMetric("test_config", "TEST_CONFIG", required=False),
|
|
EnvVarMetric("run_id", "GITHUB_RUN_ID", type_conversion_fn=int),
|
|
EnvVarMetric("run_number", "GITHUB_RUN_NUMBER", type_conversion_fn=int),
|
|
EnvVarMetric("run_attempt", "GITHUB_RUN_ATTEMPT", type_conversion_fn=int),
|
|
]
|
|
|
|
# Use info about the function that invoked this one as a namespace and a way to filter metrics.
|
|
calling_frame = inspect.currentframe().f_back # type: ignore[union-attr]
|
|
calling_frame_info = inspect.getframeinfo(calling_frame) # type: ignore[arg-type]
|
|
calling_file = os.path.basename(calling_frame_info.filename)
|
|
calling_module = inspect.getmodule(calling_frame).__name__ # type: ignore[union-attr]
|
|
calling_function = calling_frame_info.function
|
|
|
|
try:
|
|
reserved_metrics = {
|
|
"metric_name": metric_name,
|
|
"calling_file": calling_file,
|
|
"calling_module": calling_module,
|
|
"calling_function": calling_function,
|
|
"timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"),
|
|
**{m.name: m.value() for m in env_var_metrics},
|
|
}
|
|
except ValueError as e:
|
|
warn(f"Not emitting metrics. {e}")
|
|
return
|
|
|
|
# Prefix key with metric name and timestamp to derisk chance of a uuid1 name collision
|
|
reserved_metrics[
|
|
"dynamo_key"
|
|
] = f"{metric_name}_{int(time.time())}_{uuid.uuid1().hex}"
|
|
|
|
# Ensure the metrics dict doesn't contain any reserved keys
|
|
for key in reserved_metrics.keys():
|
|
used_reserved_keys = [k for k in metrics.keys() if k == key]
|
|
if used_reserved_keys:
|
|
raise ValueError(f"Metrics dict contains reserved keys: [{', '.join(key)}]")
|
|
|
|
# boto3 doesn't support uploading float values to DynamoDB, so convert them all to decimals.
|
|
metrics = _convert_float_values_to_decimals(metrics)
|
|
|
|
try:
|
|
session = boto3.Session(region_name="us-east-1")
|
|
session.resource("dynamodb").Table("torchci-metrics").put_item(
|
|
Item={
|
|
**reserved_metrics,
|
|
**metrics,
|
|
}
|
|
)
|
|
except Exception as e:
|
|
# We don't want to fail the job if we can't upload the metric.
|
|
# We still raise the ValueErrors outside this try block since those indicate improperly configured metrics
|
|
warn(f"Error uploading metric to DynamoDB: {e}")
|
|
return
|