Upload perf benchmark to Rockset in batch of at most 5000 records (#107095)

TIL, uploading to Rockset has an upper limit of 5000 records per request.  So uploading PT2 perf benchmark could fail if that limit was reached, for example https://github.com/pytorch/pytorch/actions/runs/5828810421/job/15849232756

```
HTTP response body: {"message":"The number of documents specified in this request exceeds the maximum allowed limit of 5,000 documents.","message_key":"RECEIVER_REQUEST_MAX_DOCUMENT_LIMIT","type":"INVALIDINPUT","line":null,"column":null,"trace_id":"73fc2eb5-cfd1-4baa-8141-47c7cde87812","error_id":null,"query_id":null,"internal_errors":null}
```

The fix is to upload the results in multiple smaller batches of at most 5000 records.

### Testing

5743 records from https://github.com/pytorch/pytorch/actions/runs/5828810421/job/15849232756 were written in 2 batches (5000 + 743)

```
python3 -m tools.stats.upload_dynamo_perf_stats --workflow-run-id 5821183777 --workflow-run-attempt 1 --repo pytorch/pytorch --head-branch gh/ezyang/2294/head
...
Writing 5000 documents to Rockset
Done!
Writing 743 documents to Rockset
Done!
```
Pull Request resolved: https://github.com/pytorch/pytorch/pull/107095
Approved by: https://github.com/atalman, https://github.com/seemethere, https://github.com/ZainRizvi
This commit is contained in:
Huy Do
2023-08-14 19:56:38 +00:00
committed by PyTorch MergeBot
parent 8c9b2fe8f0
commit 00751772e6
2 changed files with 57 additions and 11 deletions

View File

@ -23,6 +23,8 @@ S3_RESOURCE = boto3.resource("s3")
# NB: In CI, a flaky test is usually retried 3 times, then the test file would be rerun
# 2 more times
MAX_RETRY_IN_NON_DISABLED_MODE = 3 * 3
# NB: Rockset has an upper limit of 5000 documents in one request
BATCH_SIZE = 5000
def _get_request_headers() -> Dict[str, str]:
@ -116,17 +118,29 @@ def download_gha_artifacts(
def upload_to_rockset(
collection: str, docs: List[Any], workspace: str = "commons"
collection: str,
docs: List[Any],
workspace: str = "commons",
client: Any = None,
) -> None:
print(f"Writing {len(docs)} documents to Rockset")
if not client:
client = rockset.RocksetClient(
host="api.usw2a1.rockset.com", api_key=os.environ["ROCKSET_API_KEY"]
)
index = 0
while index < len(docs):
from_index = index
to_index = min(from_index + BATCH_SIZE, len(docs))
print(f"Writing {to_index - from_index} documents to Rockset")
client.Documents.add_documents(
collection=collection,
data=docs,
data=docs[from_index:to_index],
workspace=workspace,
)
index += BATCH_SIZE
print("Done!")

View File

@ -4,7 +4,7 @@ import unittest
from typing import Any, Dict
from unittest import mock
from tools.stats.upload_stats_lib import emit_metric
from tools.stats.upload_stats_lib import BATCH_SIZE, emit_metric, upload_to_rockset
# default values
REPO = "some/repo"
@ -109,6 +109,38 @@ class TestUploadStats(unittest.TestCase):
self.assertFalse(put_item_invoked)
def test_upload_to_rockset_batch_size(self) -> None:
cases = [
{
"batch_size": BATCH_SIZE - 1,
"expected_number_of_requests": 1,
},
{
"batch_size": BATCH_SIZE,
"expected_number_of_requests": 1,
},
{
"batch_size": BATCH_SIZE + 1,
"expected_number_of_requests": 2,
},
]
for case in cases:
mock_client = mock.Mock()
mock_client.Documents.add_documents.return_value = "OK"
batch_size = case["batch_size"]
expected_number_of_requests = case["expected_number_of_requests"]
docs = list(range(batch_size))
upload_to_rockset(
collection="test", docs=docs, workspace="commons", client=mock_client
)
self.assertEqual(
mock_client.Documents.add_documents.call_count,
expected_number_of_requests,
)
if __name__ == "__main__":
unittest.main()