[ci] upload test stats to s3 instead of rockset directly (#80593)

Previously we were writing documents to Rockset directly using the write
API. This turned out to be a source of issues, occupying Rockset leaf
CPU and making other queries timeout. Also, we are starting to get rate
limited by Rockset, leading to data loss.

Instead, write test stats to s3 and let Rocket's managed integration
sync it. This appears to be significantly more efficient, and should
solve our throughput issues fundamentally.

Hopefully we can re-enable per-PR stats after this change, but let's see
how it does first.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/80593
Approved by: https://github.com/janeyx99
This commit is contained in:
Michael Suo
2022-06-30 00:27:44 -07:00
committed by PyTorch MergeBot
parent 682c0d2615
commit 4e12300e4e
2 changed files with 41 additions and 6 deletions

View File

@ -1,11 +1,14 @@
import gzip
import io
import json
import os
import requests
import zipfile
from pathlib import Path
from typing import Dict, List, Any
from typing import Any, Dict, List
import rockset # type: ignore[import]
import boto3 # type: ignore[import]
import requests
import rockset # type: ignore[import]
PYTORCH_REPO = "https://api.github.com/repos/pytorch/pytorch"
S3_RESOURCE = boto3.resource("s3")
@ -110,6 +113,29 @@ def upload_to_rockset(collection: str, docs: List[Any]) -> None:
print("Done!")
def upload_to_s3(
workflow_run_id: int,
workflow_run_attempt: int,
collection: str,
docs: List[Dict[str, Any]],
) -> None:
print(f"Writing {len(docs)} documents to S3")
body = io.StringIO()
for doc in docs:
json.dump(doc, body)
body.write("\n")
S3_RESOURCE.Object(
"ossci-raw-job-status",
f"{collection}/{workflow_run_id}/{workflow_run_attempt}",
).put(
Body=gzip.compress(body.getvalue().encode()),
ContentEncoding="gzip",
ContentType="application/json",
)
print("Done!")
def unzip(p: Path) -> None:
"""Unzip the provided zipfile to a similarly-named directory.

View File

@ -8,7 +8,7 @@ from tempfile import TemporaryDirectory
from tools.stats.upload_stats_lib import (
download_gha_artifacts,
download_s3_artifacts,
upload_to_rockset,
upload_to_s3,
unzip,
)
@ -207,8 +207,17 @@ if __name__ == "__main__":
# For PRs, only upload a summary of test_runs. This helps lower the
# volume of writes we do to Rockset.
upload_to_rockset("test_run_summary", summarize_test_cases(test_cases))
upload_to_s3(
args.workflow_run_id,
args.workflow_run_attempt,
"test_run_summary",
summarize_test_cases(test_cases),
)
# upload_to_rockset("test_run_summary", summarize_test_cases(test_cases))
if args.head_branch == "master":
# For master jobs, upload everytihng.
upload_to_rockset("test_run", test_cases)
upload_to_s3(
args.workflow_run_id, args.workflow_run_attempt, "test_run", test_cases
)