Files
pytorch/tools/stats/upload_stats_lib.py
Huy Do cc775fb8c4 Upload torch dynamo perf stats to Rockset (#95675)
The new workflow is run after `inductor` or `inductor-perf-test-nightly` finish in trunk (not on PR).  All test reports CSV files are ingested into https://console.rockset.com/collections/details/inductor.torch_dynamo_perf_stats

### Testing

Run

* inductor-A100-perf
```
python -m tools.stats.upload_dynamo_perf_stats --workflow-run-id 4272892998 --workflow-run-attempt 1 --repo pytorch/pytorch
```

to ingest some data from 9b7abc4fac

Pull Request resolved: https://github.com/pytorch/pytorch/pull/95675
Approved by: https://github.com/weiwangmeta
2023-03-06 05:28:20 +00:00

196 lines
5.7 KiB
Python

import gzip
import io
import json
import os
import xml.etree.ElementTree as ET
import zipfile
from pathlib import Path
from typing import Any, Dict, List
import boto3 # type: ignore[import]
import requests
import rockset # type: ignore[import]
PYTORCH_REPO = "https://api.github.com/repos/pytorch/pytorch"
S3_RESOURCE = boto3.resource("s3")
TARGET_WORKFLOW = "--rerun-disabled-tests"
def _get_request_headers() -> Dict[str, str]:
return {
"Accept": "application/vnd.github.v3+json",
"Authorization": "token " + os.environ["GITHUB_TOKEN"],
}
def _get_artifact_urls(prefix: str, workflow_run_id: int) -> Dict[Path, str]:
"""Get all workflow artifacts with 'test-report' in the name."""
response = requests.get(
f"{PYTORCH_REPO}/actions/runs/{workflow_run_id}/artifacts?per_page=100",
)
artifacts = response.json()["artifacts"]
while "next" in response.links.keys():
response = requests.get(
response.links["next"]["url"], headers=_get_request_headers()
)
artifacts.extend(response.json()["artifacts"])
artifact_urls = {}
for artifact in artifacts:
if artifact["name"].startswith(prefix):
artifact_urls[Path(artifact["name"])] = artifact["archive_download_url"]
return artifact_urls
def _download_artifact(
artifact_name: Path, artifact_url: str, workflow_run_attempt: int
) -> Path:
# [Artifact run attempt]
# All artifacts on a workflow share a single namespace. However, we can
# re-run a workflow and produce a new set of artifacts. To avoid name
# collisions, we add `-runattempt1<run #>-` somewhere in the artifact name.
#
# This code parses out the run attempt number from the artifact name. If it
# doesn't match the one specified on the command line, skip it.
atoms = str(artifact_name).split("-")
for atom in atoms:
if atom.startswith("runattempt"):
found_run_attempt = int(atom[len("runattempt") :])
if workflow_run_attempt != found_run_attempt:
print(
f"Skipping {artifact_name} as it is an invalid run attempt. "
f"Expected {workflow_run_attempt}, found {found_run_attempt}."
)
print(f"Downloading {artifact_name}")
response = requests.get(artifact_url, headers=_get_request_headers())
with open(artifact_name, "wb") as f:
f.write(response.content)
return artifact_name
def download_s3_artifacts(
prefix: str, workflow_run_id: int, workflow_run_attempt: int
) -> List[Path]:
bucket = S3_RESOURCE.Bucket("gha-artifacts")
objs = bucket.objects.filter(
Prefix=f"pytorch/pytorch/{workflow_run_id}/{workflow_run_attempt}/artifact/{prefix}"
)
found_one = False
paths = []
for obj in objs:
found_one = True
p = Path(Path(obj.key).name)
print(f"Downloading {p}")
with open(p, "wb") as f:
f.write(obj.get()["Body"].read())
paths.append(p)
if not found_one:
print(
"::warning title=s3 artifacts not found::"
"Didn't find any test reports in s3, there might be a bug!"
)
return paths
def download_gha_artifacts(
prefix: str, workflow_run_id: int, workflow_run_attempt: int
) -> List[Path]:
artifact_urls = _get_artifact_urls(prefix, workflow_run_id)
paths = []
for name, url in artifact_urls.items():
paths.append(_download_artifact(Path(name), url, workflow_run_attempt))
return paths
def upload_to_rockset(
collection: str, docs: List[Any], workspace: str = "commons"
) -> None:
print(f"Writing {len(docs)} documents to Rockset")
client = rockset.RocksetClient(
host="api.usw2a1.rockset.com", api_key=os.environ["ROCKSET_API_KEY"]
)
client.Documents.add_documents(
collection=collection,
data=docs,
workspace=workspace,
)
print("Done!")
def upload_to_s3(
bucket_name: str,
key: str,
docs: List[Dict[str, Any]],
) -> None:
print(f"Writing {len(docs)} documents to S3")
body = io.StringIO()
for doc in docs:
json.dump(doc, body)
body.write("\n")
S3_RESOURCE.Object(f"{bucket_name}", f"{key}",).put(
Body=gzip.compress(body.getvalue().encode()),
ContentEncoding="gzip",
ContentType="application/json",
)
print("Done!")
def upload_workflow_stats_to_s3(
workflow_run_id: int,
workflow_run_attempt: int,
collection: str,
docs: List[Dict[str, Any]],
) -> None:
bucket_name = "ossci-raw-job-status"
key = f"{collection}/{workflow_run_id}/{workflow_run_attempt}"
upload_to_s3(bucket_name, key, docs)
def upload_file_to_s3(
file_name: str,
bucket: str,
key: str,
) -> None:
"""
Upload a local file to S3
"""
print(f"Upload {file_name} to s3://{bucket}/{key}")
boto3.client("s3").upload_file(
file_name,
bucket,
key,
)
def unzip(p: Path) -> None:
"""Unzip the provided zipfile to a similarly-named directory.
Returns None if `p` is not a zipfile.
Looks like: /tmp/test-reports.zip -> /tmp/unzipped-test-reports/
"""
assert p.is_file()
unzipped_dir = p.with_name("unzipped-" + p.stem)
print(f"Extracting {p} to {unzipped_dir}")
with zipfile.ZipFile(p, "r") as zip:
zip.extractall(unzipped_dir)
def is_rerun_disabled_tests(root: ET.ElementTree) -> bool:
"""
Check if the test report is coming from rerun_disabled_tests workflow
"""
skipped = root.find(".//*skipped")
# Need to check against None here, if not skipped doesn't work as expected
if skipped is None:
return False
message = skipped.attrib.get("message", "")
return TARGET_WORKFLOW in message or "num_red" in message