Files
pytorch/tools/stats/upload_external_contrib_stats.py
PaliC c03555a303 add retries to external contribution data upload (#100889)
Adds retries to external contribution upload as it is shown to be flaky

<!--
copilot:summary
-->
### <samp>🤖 Generated by Copilot at 43c2602</samp>

Added a function to read data from S3 objects and used it to implement a retry mechanism and verification for uploading external contribution stats. Modified `tools/stats/upload_external_contrib_stats.py` and `tools/stats/upload_stats_lib.py`.
<!--
copilot:poem
-->
### <samp>🤖 Generated by Copilot at 43c2602</samp>

> _We'll upload the stats to the cloud, me hearties_
> _We'll use `read_from_s3` to check them all_
> _We'll retry if the connection fails, me hearties_
> _We'll log the results and have a ball_

Pull Request resolved: https://github.com/pytorch/pytorch/pull/100889
Approved by: https://github.com/huydhn
2023-05-16 05:00:48 +00:00

163 lines
5.3 KiB
Python

import argparse
import datetime
import json
import os
import urllib.parse
from typing import Any, Callable, cast, Dict, List, Optional, Set
from urllib.error import HTTPError
from urllib.request import Request, urlopen
# import time
from tools.stats.upload_stats_lib import read_from_s3, upload_to_s3
FILTER_OUT_USERS = {"pytorchmergebot", "facebook-github-bot", "pytorch-bot[bot]"}
MAXIMUM_RETRIES = 5
def _fetch_url(
url: str,
headers: Dict[str, str],
data: Optional[Dict[str, Any]] = None,
method: Optional[str] = None,
reader: Callable[[Any], Any] = lambda x: x.read(),
) -> Any:
token = os.environ.get("GITHUB_TOKEN")
if token is not None and url.startswith("https://api.github.com/"):
headers["Authorization"] = f"token {token}"
data_ = json.dumps(data).encode() if data is not None else None
try:
with urlopen(Request(url, headers=headers, data=data_, method=method)) as conn:
return reader(conn)
except HTTPError as err:
print(err.reason)
print(err.headers)
if err.code == 403 and all(
key in err.headers for key in ["X-RateLimit-Limit", "X-RateLimit-Used"]
):
print(
f"Rate limit exceeded: {err.headers['X-RateLimit-Used']}/{err.headers['X-RateLimit-Limit']}"
)
raise
def fetch_json(
url: str,
params: Optional[Dict[str, Any]] = None,
data: Optional[Dict[str, Any]] = None,
) -> List[Dict[str, Any]]:
headers = {"Accept": "application/vnd.github.v3+json"}
if params is not None and len(params) > 0:
url += "?" + "&".join(
f"{name}={urllib.parse.quote(str(val))}" for name, val in params.items()
)
return cast(
List[Dict[str, Any]],
_fetch_url(url, headers=headers, data=data, reader=json.load),
)
def get_external_pr_data(
start_date: datetime.date, end_date: datetime.date, period_length: int = 1
) -> List[Dict[str, Any]]:
pr_info = []
period_begin_date = start_date
pr_count = 0
users: Set[str] = set()
while period_begin_date < end_date:
period_end_date = period_begin_date + datetime.timedelta(days=period_length - 1)
page = 1
responses: List[Dict[str, Any]] = []
while len(responses) > 0 or page == 1:
response = cast(
Dict[str, Any],
fetch_json(
"https://api.github.com/search/issues",
params={
"q": f'repo:pytorch/pytorch is:pr is:closed \
label:"open source" label:Merged -label:Reverted closed:{period_begin_date}..{period_end_date}',
"per_page": "100",
"page": str(page),
},
),
)
items = response["items"]
for item in items:
u = item["user"]["login"]
if u not in FILTER_OUT_USERS:
pr_count += 1
users.add(u)
page += 1
pr_info.append(
{
"date": str(period_begin_date),
"pr_count": pr_count,
"user_count": len(users),
"users": list(users),
}
)
period_begin_date = period_end_date + datetime.timedelta(days=1)
return pr_info
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Upload external contribution stats to Rockset"
)
parser.add_argument(
"--startDate",
type=datetime.date.fromisoformat,
required=True,
help="the first date to upload data for in any valid ISO 8601 format format (eg. YYYY-MM-DD).",
)
parser.add_argument(
"--length",
type=int,
required=False,
help="the number of days to upload data for. Default is 1.",
default=1,
)
parser.add_argument(
"--period-length",
type=int,
required=False,
help="the number of days to group data for. Default is 1.",
default=1,
)
args = parser.parse_args()
for i in range(args.length):
tries = 0
startdate = args.startDate + datetime.timedelta(days=i)
data = get_external_pr_data(
startdate,
startdate + datetime.timedelta(days=args.period_length),
period_length=args.period_length,
)
while tries < MAXIMUM_RETRIES:
success = True
upload_to_s3(
bucket_name="torchci-contribution-data",
key=f"external_contribution_counts/{str(startdate)}",
docs=data,
)
uploaded_data = read_from_s3(
"torchci-contribution-data",
f"external_contribution_counts/{str(startdate)}",
)
for doc in data:
if doc not in uploaded_data:
tries += 1
print(
f"Failed to upload data retrying upload up to {MAXIMUM_RETRIES - tries} more times"
)
success = False
break
if success:
break
# uncomment when running large queries locally to avoid github's rate limiting
#
# import time
# time.sleep(20)