[CI] upload_metrics function to upload to s3 instead of dynamo (#136799)

* Upload_metrics function to upload to ossci-raw-job-status bucket instead of dynamo
* Moves all added metrics to a field called "info" so ingesting into database table with a strict schema is easier
* Removes the dynamo_key field since it is no longer needed
* Removes the concept of reserved metrics, since they cannot be overwritten by user added metrics anymore
* Moves s3 resource initialization behind a function so import is faster
---
Tested by emitting a metric during run_test and seeing that documents got added to s3
Pull Request resolved: https://github.com/pytorch/pytorch/pull/136799
Approved by: https://github.com/ZainRizvi
This commit is contained in:
Catherine Lee
2024-10-02 23:19:28 +00:00
committed by PyTorch MergeBot
parent 2c9e194e23
commit 235f7e06f4
3 changed files with 64 additions and 137 deletions

View File

@ -6,7 +6,6 @@ import os
import time
import uuid
from datetime import timezone
from decimal import Decimal
from typing import Any
from warnings import warn
@ -17,18 +16,12 @@ from warnings import warn
# worry about it.
EMIT_METRICS = False
try:
import boto3 # type: ignore[import]
from tools.stats.upload_stats_lib import upload_to_s3
EMIT_METRICS = True
except ImportError as e:
print(f"Unable to import boto3. Will not be emitting metrics.... Reason: {e}")
# Sometimes our runner machines are located in one AWS account while the metrics table may be in
# another, so we need to specify the table's ARN explicitly.
TORCHCI_METRICS_TABLE_ARN = (
"arn:aws:dynamodb:us-east-1:308535385114:table/torchci-metrics"
)
class EnvVarMetric:
name: str
@ -133,7 +126,7 @@ def emit_metric(
calling_function = calling_frame_info.function
try:
reserved_metrics = {
default_metrics = {
"metric_name": metric_name,
"calling_file": calling_file,
"calling_module": calling_module,
@ -148,27 +141,14 @@ def emit_metric(
return
# Prefix key with metric name and timestamp to derisk chance of a uuid1 name collision
reserved_metrics[
"dynamo_key"
] = f"{metric_name}_{int(time.time())}_{uuid.uuid1().hex}"
# Ensure the metrics dict doesn't contain any reserved keys
for key in reserved_metrics.keys():
used_reserved_keys = [k for k in metrics.keys() if k == key]
if used_reserved_keys:
raise ValueError(f"Metrics dict contains reserved keys: [{', '.join(key)}]")
# boto3 doesn't support uploading float values to DynamoDB, so convert them all to decimals.
metrics = _convert_float_values_to_decimals(metrics)
s3_key = f"{metric_name}_{int(time.time())}_{uuid.uuid1().hex}"
if EMIT_METRICS:
try:
session = boto3.Session(region_name="us-east-1")
session.resource("dynamodb").Table(TORCHCI_METRICS_TABLE_ARN).put_item(
Item={
**reserved_metrics,
**metrics,
}
upload_to_s3(
bucket_name="ossci-raw-job-status",
key=f"ossci_uploaded_metrics/{s3_key}",
docs=[{**default_metrics, "info": metrics}],
)
except Exception as e:
# We don't want to fail the job if we can't upload the metric.
@ -177,19 +157,3 @@ def emit_metric(
return
else:
print(f"Not emitting metrics for {metric_name}. Boto wasn't imported.")
def _convert_float_values_to_decimals(data: dict[str, Any]) -> dict[str, Any]:
# Attempt to recurse
def _helper(o: Any) -> Any:
if isinstance(o, float):
return Decimal(str(o))
if isinstance(o, list):
return [_helper(v) for v in o]
if isinstance(o, dict):
return {_helper(k): _helper(v) for k, v in o.items()}
if isinstance(o, tuple):
return tuple(_helper(v) for v in o)
return o
return {k: _helper(v) for k, v in data.items()}

View File

@ -7,6 +7,7 @@ import math
import os
import time
import zipfile
from functools import lru_cache
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
@ -16,7 +17,12 @@ import rockset # type: ignore[import]
PYTORCH_REPO = "https://api.github.com/repos/pytorch/pytorch"
S3_RESOURCE = boto3.resource("s3")
@lru_cache
def get_s3_resource() -> Any:
return boto3.resource("s3")
# NB: In CI, a flaky test is usually retried 3 times, then the test file would be rerun
# 2 more times
@ -83,7 +89,7 @@ def _download_artifact(
def download_s3_artifacts(
prefix: str, workflow_run_id: int, workflow_run_attempt: int
) -> list[Path]:
bucket = S3_RESOURCE.Bucket("gha-artifacts")
bucket = get_s3_resource().Bucket("gha-artifacts")
objs = bucket.objects.filter(
Prefix=f"pytorch/pytorch/{workflow_run_id}/{workflow_run_attempt}/artifact/{prefix}"
)
@ -172,7 +178,7 @@ def upload_to_s3(
json.dump(doc, body)
body.write("\n")
S3_RESOURCE.Object(
get_s3_resource().Object(
f"{bucket_name}",
f"{key}",
).put(
@ -189,7 +195,8 @@ def read_from_s3(
) -> list[dict[str, Any]]:
print(f"Reading from s3://{bucket_name}/{key}")
body = (
S3_RESOURCE.Object(
get_s3_resource()
.Object(
f"{bucket_name}",
f"{key}",
)

View File

@ -1,20 +1,25 @@
from __future__ import annotations
import decimal
import gzip
import inspect
import json
import sys
import unittest
from pathlib import Path
from typing import Any
from typing import Any, Dict
from unittest import mock
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
sys.path.insert(0, str(REPO_ROOT))
from tools.stats.upload_metrics import add_global_metric, emit_metric
from tools.stats.upload_stats_lib import BATCH_SIZE, remove_nan_inf, upload_to_rockset
from tools.stats.upload_metrics import add_global_metric, emit_metric, global_metrics
from tools.stats.upload_stats_lib import (
BATCH_SIZE,
get_s3_resource,
remove_nan_inf,
upload_to_rockset,
)
sys.path.remove(str(REPO_ROOT))
@ -33,9 +38,22 @@ JOB_ID = 234
JOB_NAME = "some-job-name"
@mock.patch("boto3.resource")
class TestUploadStats(unittest.TestCase):
emitted_metric: Dict[str, Any] = {"did_not_emit": True}
def mock_put_item(self, **kwargs: Any) -> None:
# Utility for mocking putting items into s3. THis will save the emitted
# metric so tests can check it
self.emitted_metric = json.loads(
gzip.decompress(kwargs["Body"]).decode("utf-8")
)
# Before each test, set the env vars to their default values
def setUp(self) -> None:
get_s3_resource.cache_clear()
global_metrics.clear()
mock.patch.dict(
"os.environ",
{
@ -54,7 +72,6 @@ class TestUploadStats(unittest.TestCase):
clear=True, # Don't read any preset env vars
).start()
@mock.patch("boto3.Session.resource")
def test_emits_default_and_given_metrics(self, mock_resource: Any) -> None:
metric = {
"some_number": 123,
@ -79,29 +96,20 @@ class TestUploadStats(unittest.TestCase):
"run_id": RUN_ID,
"run_number": RUN_NUMBER,
"run_attempt": RUN_ATTEMPT,
"some_number": 123,
"float_number": decimal.Decimal(str(32.34)),
"job_id": JOB_ID,
"job_name": JOB_NAME,
"info": metric,
}
# Preserve the metric emitted
emitted_metric: dict[str, Any] = {}
def mock_put_item(Item: dict[str, Any]) -> None:
nonlocal emitted_metric
emitted_metric = Item
mock_resource.return_value.Table.return_value.put_item = mock_put_item
mock_resource.return_value.Object.return_value.put = self.mock_put_item
emit_metric("metric_name", metric)
self.assertEqual(
emitted_metric,
{**emit_should_include, **emitted_metric},
self.emitted_metric,
{**self.emitted_metric, **emit_should_include},
)
@mock.patch("boto3.Session.resource")
def test_when_global_metric_specified_then_it_emits_it(
self, mock_resource: Any
) -> None:
@ -119,23 +127,15 @@ class TestUploadStats(unittest.TestCase):
global_metric_name: global_metric_value,
}
# Preserve the metric emitted
emitted_metric: dict[str, Any] = {}
def mock_put_item(Item: dict[str, Any]) -> None:
nonlocal emitted_metric
emitted_metric = Item
mock_resource.return_value.Table.return_value.put_item = mock_put_item
mock_resource.return_value.Object.return_value.put = self.mock_put_item
emit_metric("metric_name", metric)
self.assertEqual(
emitted_metric,
{**emitted_metric, **emit_should_include},
self.emitted_metric,
{**self.emitted_metric, "info": emit_should_include},
)
@mock.patch("boto3.Session.resource")
def test_when_local_and_global_metric_specified_then_global_is_overridden(
self, mock_resource: Any
) -> None:
@ -155,23 +155,15 @@ class TestUploadStats(unittest.TestCase):
global_metric_name: local_override,
}
# Preserve the metric emitted
emitted_metric: dict[str, Any] = {}
def mock_put_item(Item: dict[str, Any]) -> None:
nonlocal emitted_metric
emitted_metric = Item
mock_resource.return_value.Table.return_value.put_item = mock_put_item
mock_resource.return_value.Object.return_value.put = self.mock_put_item
emit_metric("metric_name", metric)
self.assertEqual(
emitted_metric,
{**emitted_metric, **emit_should_include},
self.emitted_metric,
{**self.emitted_metric, "info": emit_should_include},
)
@mock.patch("boto3.Session.resource")
def test_when_optional_envvar_set_to_actual_value_then_emit_vars_emits_it(
self, mock_resource: Any
) -> None:
@ -180,7 +172,7 @@ class TestUploadStats(unittest.TestCase):
}
emit_should_include = {
**metric,
"info": {**metric},
"pr_number": PR_NUMBER,
}
@ -191,23 +183,15 @@ class TestUploadStats(unittest.TestCase):
},
).start()
# Preserve the metric emitted
emitted_metric: dict[str, Any] = {}
def mock_put_item(Item: dict[str, Any]) -> None:
nonlocal emitted_metric
emitted_metric = Item
mock_resource.return_value.Table.return_value.put_item = mock_put_item
mock_resource.return_value.Object.return_value.put = self.mock_put_item
emit_metric("metric_name", metric)
self.assertEqual(
emitted_metric,
{**emit_should_include, **emitted_metric},
self.emitted_metric,
{**self.emitted_metric, **emit_should_include},
)
@mock.patch("boto3.Session.resource")
def test_when_optional_envvar_set_to_a_empty_str_then_emit_vars_ignores_it(
self, mock_resource: Any
) -> None:
@ -224,35 +208,20 @@ class TestUploadStats(unittest.TestCase):
},
).start()
# Preserve the metric emitted
emitted_metric: dict[str, Any] = {}
def mock_put_item(Item: dict[str, Any]) -> None:
nonlocal emitted_metric
emitted_metric = Item
mock_resource.return_value.Table.return_value.put_item = mock_put_item
mock_resource.return_value.Object.return_value.put = self.mock_put_item
emit_metric("metric_name", metric)
self.assertEqual(
emitted_metric,
{**emit_should_include, **emitted_metric},
self.emitted_metric,
{**self.emitted_metric, "info": emit_should_include},
f"Metrics should be emitted when an option parameter is set to '{default_val}'",
)
self.assertFalse(
emitted_metric.get("pr_number"),
self.emitted_metric.get("pr_number"),
f"Metrics should not include optional item 'pr_number' when it's envvar is set to '{default_val}'",
)
@mock.patch("boto3.Session.resource")
def test_blocks_emission_if_reserved_keyword_used(self, mock_resource: Any) -> None:
metric = {"repo": "awesome/repo"}
with self.assertRaises(ValueError):
emit_metric("metric_name", metric)
@mock.patch("boto3.Session.resource")
def test_no_metrics_emitted_if_required_env_var_not_set(
self, mock_resource: Any
) -> None:
@ -267,19 +236,12 @@ class TestUploadStats(unittest.TestCase):
clear=True,
).start()
put_item_invoked = False
def mock_put_item(Item: dict[str, Any]) -> None:
nonlocal put_item_invoked
put_item_invoked = True
mock_resource.return_value.Table.return_value.put_item = mock_put_item
mock_resource.return_value.Object.return_value.put = self.mock_put_item
emit_metric("metric_name", metric)
self.assertFalse(put_item_invoked)
self.assertTrue(self.emitted_metric["did_not_emit"])
@mock.patch("boto3.Session.resource")
def test_no_metrics_emitted_if_required_env_var_set_to_empty_string(
self, mock_resource: Any
) -> None:
@ -292,19 +254,13 @@ class TestUploadStats(unittest.TestCase):
},
).start()
put_item_invoked = False
def mock_put_item(Item: dict[str, Any]) -> None:
nonlocal put_item_invoked
put_item_invoked = True
mock_resource.return_value.Table.return_value.put_item = mock_put_item
mock_resource.return_value.Object.return_value.put = self.mock_put_item
emit_metric("metric_name", metric)
self.assertFalse(put_item_invoked)
self.assertTrue(self.emitted_metric["did_not_emit"])
def test_upload_to_rockset_batch_size(self) -> None:
def test_upload_to_rockset_batch_size(self, _mocked_resource: Any) -> None:
cases = [
{
"batch_size": BATCH_SIZE - 1,
@ -336,7 +292,7 @@ class TestUploadStats(unittest.TestCase):
expected_number_of_requests,
)
def test_remove_nan_inf(self) -> None:
def test_remove_nan_inf(self, _mocked_resource: Any) -> None:
checks = [
(float("inf"), '"inf"', "Infinity"),
(float("nan"), '"nan"', "NaN"),