Upload perf stats to both Rockset and dynamoDB (#129544)

To avoid outage on HUD, I plan to migrate perf stats to dynamoDB as follows:

1. Upload perf stats to both Rockset and dynamoDB
2. Copy all the existing content from Rockset to dynamoDB
3. Create new Rockset tables to map to dynamoDB
4. Switch HUD to use the new Rockset tables (temporarily)
5. Delete the existing tables

This depends on https://github.com/pytorch-labs/pytorch-gha-infra/pull/422

### Testing

```
python3 -m tools.stats.upload_dynamo_perf_stats --workflow-run-id 9770217910 --workflow-run-attempt 1 --repo "pytorch/pytorch" --head-branch "gh/shunting314/162/head" --rockset-collection torch_dynamo_perf_stats --rockset-workspace inductor --dynamodb-table torchci-dynamo-perf-stats --match-filename "^inductor_"
...
Writing 1607 documents to DynamoDB torchci-dynamo-perf-stats
```

And confirm the same number of documents is on the table

![Screenshot 2024-07-03 at 18 10 35](https://github.com/pytorch/pytorch/assets/475357/6c055c96-00ca-4cb3-bbe5-fe4914f9da9b)

Pull Request resolved: https://github.com/pytorch/pytorch/pull/129544
Approved by: https://github.com/clee2000
This commit is contained in:
Huy Do
2024-07-05 16:31:49 +00:00
committed by PyTorch MergeBot
parent e7ab7b83bc
commit a33ee73a28
4 changed files with 69 additions and 17 deletions

View File

@ -6,7 +6,7 @@ import json
import os
import zipfile
from pathlib import Path
from typing import Any
from typing import Any, Callable, Dict, List, Optional
import boto3 # type: ignore[import]
import requests
@ -141,6 +141,21 @@ def upload_to_rockset(
print("Done!")
def upload_to_dynamodb(
dynamodb_table: str,
repo: str,
docs: List[Any],
generate_partition_key: Optional[Callable[[str, Dict[str, Any]], str]],
) -> None:
print(f"Writing {len(docs)} documents to DynamoDB {dynamodb_table}")
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/dynamodb.html#batch-writing
with boto3.resource("dynamodb").Table(dynamodb_table).batch_writer() as batch:
for doc in docs:
if generate_partition_key:
doc["dynamoKey"] = generate_partition_key(repo, doc)
batch.put_item(Item=doc)
def upload_to_s3(
bucket_name: str,
key: str,