[BE] Remove outdated RPC benchmark (#146716)

We have lots of outdated unused + uncalled code in our codebase, namely in our benchmarks and examples folders among others. The last change to this directory was 4 years ago and this code looks dead. cc @albanD @H-Huang for feedback

Pull Request resolved: https://github.com/pytorch/pytorch/pull/146716
Approved by: https://github.com/Skylion007, https://github.com/H-Huang
This commit is contained in:
Jane Xu
2025-02-07 10:52:54 -08:00
committed by PyTorch MergeBot
parent beea76020b
commit 2c9e07ecd2
29 changed files with 0 additions and 2535 deletions

View File

@ -1,62 +0,0 @@
# RPC PS Benchmark
## How to add your experiment
1. Data
- Create a data class and add it to the data directory
- Update benchmark_class_helper.py to include your data class in the data_map
- Add configurations to data_configurations.json in the configurations directory
2. Model
- Create a model class and add it to the model directory
- Update benchmark_class_helper.py to include your model class in the model_map
- Add configurations to model_configurations.json in the configurations directory
3. Trainer
- Create a trainer class and add it to the trainer directory
- Update benchmark_class_helper.py to include your trainer class in the trainer_map
- Add configurations to trainer_configurations.json in the configurations directory
4. Parameter Server
- Create a parameter server class and add it to the parameter_servers directory
- Update benchmark_class_helper.py to include your parameter_server class in the ps_map
- Add configurations to parameter_server_configurations.json in the configurations directory
5. Script
- Create a bash script for your experiment and add it to the experiment_scripts directory
6. Testing
- Add a test method for your script to test_scripts.py
## Trainer class
The trainer directory contains base classes to provide a starting point for implementing a trainer.
Inherit from a base class and implement your trainer. The benchmark has two requirements for trainers.
1. It must implement a __init__ method that takes rank, trainer_count, and ps_rref as arguments
```python
def __init__(self, rank, trainer_count, ps_rref, backend, use_cuda_rpc):
```
2. It must implement a train method that takes model and data as arguments.
```python
def train(self, model, data):
```
## Parameter Server class
The parameter_server directory contains base classes to provide a starting point for implementing a parameter server.
Inherit from a base class and implement your parameter server. The benchmark has two requirements for parameter servers.
1. It must implement a __init__ method that takes rank and ps_trainer_count as arguments
```python
def __init__(self, rank, ps_trainer_count, backend, use_cuda_rpc):
```
2. It must implement a reset_state method
```python
def reset_state(ps_rref):
```
## Testing
Use `pytest` to run the test methods added to test_scripts.py. To test all the scripts added use `pytest test_scripts.py`.

View File

@ -1,11 +0,0 @@
{
"DummyData": {
"data_class": "DummyData",
"configurations": {
"max_val": 1024,
"sample_count": 1024,
"sample_length": 1024,
"sparsity_percentage": 20
}
}
}

View File

@ -1,24 +0,0 @@
{
"DummyModel": {
"model_class": "DummyModel",
"configurations": {
"num_embeddings": 1024,
"embedding_dim": 1024,
"dense_input_size": 1024,
"dense_output_size": 1024,
"dense_layers_count": 8,
"sparse": false
}
},
"DummyModelSparse": {
"model_class": "DummyModel",
"configurations": {
"num_embeddings": 1024,
"embedding_dim": 1024,
"dense_input_size": 1024,
"dense_output_size": 1024,
"dense_layers_count": 8,
"sparse": true
}
}
}

View File

@ -1,53 +0,0 @@
import random
import numpy as np
import torch
from torch.utils.data import Dataset
class DummyData(Dataset):
def __init__(
self,
max_val: int,
sample_count: int,
sample_length: int,
sparsity_percentage: int,
):
r"""
A data class that generates random data.
Args:
max_val (int): the maximum value for an element
sample_count (int): count of training samples
sample_length (int): number of elements in a sample
sparsity_percentage (int): the percentage of
embeddings used by the input data in each iteration
"""
self.max_val = max_val
self.input_samples = sample_count
self.input_dim = sample_length
self.sparsity_percentage = sparsity_percentage
def generate_input():
precentage_of_elements = (100 - self.sparsity_percentage) / float(100)
index_count = int(self.max_val * precentage_of_elements)
elements = list(range(self.max_val))
random.shuffle(elements)
elements = elements[:index_count]
data = [
[
elements[random.randint(0, index_count - 1)]
for _ in range(self.input_dim)
]
for _ in range(self.input_samples)
]
return torch.from_numpy(np.array(data))
self.input = generate_input()
self.target = torch.randint(0, max_val, [sample_count])
def __len__(self):
return len(self.input)
def __getitem__(self, index):
return self.input[index], self.target[index]

View File

@ -1,4 +0,0 @@
from .DummyData import DummyData
data_map = {"DummyData": DummyData}

View File

@ -1,545 +0,0 @@
import argparse
import json
import os
from pathlib import Path
from data import data_map
from metrics.ProcessedMetricsPrinter import ProcessedMetricsPrinter
from models import model_map
from server import server_map
from trainer import (
criterion_map,
ddp_hook_map,
ddp_model_map,
hook_state_map,
iteration_step_map,
preprocess_data_map,
trainer_map,
)
import torch
import torch.distributed as c10d
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
from torch.distributed.rpc import TensorPipeRpcBackendOptions
from torch.futures import wait_all
from torch.utils.data import DataLoader
def get_name(rank, args):
r"""
A function that gets the name for the rank
argument
Args:
rank (int): process number in the world
args (parser): benchmark configurations
"""
t_count = args.ntrainer + args.ncudatrainer
s_count = args.nserver + args.ncudaserver
if rank < t_count:
return f"trainer{rank}"
elif rank < (t_count + s_count):
return f"server{rank}"
else:
return "master"
def get_server_rank(args, rank):
r"""
A function that gets the server rank for
the rank argument.
Args:
args (parser): benchmark configurations
rank (int): trainer rank
"""
s_offset = args.ntrainer + args.ncudatrainer
tps = args.ntrainer // args.nserver
return rank // tps + s_offset
def get_cuda_server_rank(args, rank):
r"""
A function that gets the cudaserver rank for
the rank argument.
Args:
args (parser): benchmark configurations
rank (int): trainer rank
"""
s_offset = args.ntrainer + args.ncudatrainer + args.nserver
t_index = rank - args.ntrainer
ctps = args.ncudatrainer // args.ncudaserver
return t_index // ctps + s_offset
def get_server_rref(server_rank, args, extra_args):
r"""
A function that creates a RRef to the server.
Args:
server_rank (int): process number in the world
args (parser): benchmark configurations
extra_args (dict): configurations added by the user
"""
server = server_map[args.server]
name = get_name(server_rank, args)
if extra_args is not None:
server_args = extra_args.values()
else:
server_args = []
if server_rank >= args.ntrainer + args.ncudatrainer + args.nserver:
trainer_count = args.ncudatrainer / args.ncudaserver
use_cuda_rpc = True
else:
trainer_count = args.ntrainer / args.nserver
use_cuda_rpc = False
return rpc.remote(
name,
server,
args=(
server_rank,
trainer_count,
use_cuda_rpc,
*server_args,
),
)
def run_trainer(args, extra_args, data, rank, server_rref):
r"""
A function that runs obtains a trainer instance and calls
the train method.
Args:
args (parser): benchmark configurations
extra_args (dict): configurations added by the user
data (list): training samples
rank (int): process number in the world
server_rref (dict): a dictionary containing server RRefs
"""
trainer_class = trainer_map[args.trainer]
if extra_args is not None:
trainer_args = extra_args.values()
else:
trainer_args = []
trainer_count = args.ntrainer + args.ncudatrainer
store = c10d.FileStore(args.filestore, trainer_count)
if args.backend == "gloo":
process_group = c10d.ProcessGroupGloo(store, rank, trainer_count)
elif args.backend == "nccl":
process_group = c10d.ProcessGroupNCCL(store, rank, trainer_count)
elif args.backend == "multi":
process_group = c10d.ProcessGroupNCCL(store, rank, trainer_count)
if c10d.is_initialized() is False:
c10d.init_process_group(backend="gloo", rank=rank, world_size=trainer_count)
model = load_model(args)
preprocess_data = preprocess_data_map[args.preprocess_data]
create_criterion = criterion_map[args.create_criterion]
create_ddp_model = ddp_model_map[args.create_ddp_model]
iteration_step = iteration_step_map[args.iteration_step]
hook_state_class = hook_state_map[args.hook_state]
hook = ddp_hook_map[args.ddp_hook]
# check if this a cudatrainer
use_cuda_rpc = rank >= args.ntrainer
trainer = trainer_class(
process_group,
use_cuda_rpc,
server_rref,
args.backend,
args.epochs,
preprocess_data,
create_criterion,
create_ddp_model,
hook_state_class,
hook,
iteration_step,
*trainer_args,
)
trainer.train(model, data)
metrics = trainer.get_metrics()
return [rank, metrics]
def call_trainers(args, extra_args, train_data, server_rrefs):
r"""
A function that starts the trainers. Each trainer is started
using an rpc_async request.
Args:
args (parser): benchmark configurations
extra_args (dict): configurations added by the user
train_data (list): training samples
server_rrefs (dict): a dictionary containing server RRefs
"""
futs = []
for trainer_rank in range(0, args.ntrainer + args.ncudatrainer):
trainer_name = get_name(trainer_rank, args)
server_rref = None
if server_rrefs:
if trainer_rank >= args.ntrainer:
server_rank = get_cuda_server_rank(args, trainer_rank)
else:
server_rank = get_server_rank(args, trainer_rank)
server_rref = server_rrefs[server_rank]
fut = rpc.rpc_async(
trainer_name,
run_trainer,
args=(
args,
extra_args,
train_data[trainer_rank],
trainer_rank,
server_rref,
),
timeout=args.rpc_timeout,
)
futs.append(fut)
return futs
def benchmark_warmup(args, extra_args, data, server_rrefs):
r"""
A function that runs the training algorithm. The goal of this
function is to warm the rpc. The server states are reset.
Args:
args (parser): benchmark configurations
extra_args (dict): configurations added by the user
data (list): training samples
server_rrefs (dict): a dictionary containing server RRefs
"""
futs = call_trainers(args, extra_args, data, server_rrefs)
wait_all(futs)
for server_rref in server_rrefs.values():
server_rref.rpc_sync().reset_state(server_rref)
print("benchmark warmup done\n")
def split_list(arr, n):
r"""
A function that splits a list into n lists
Args:
arr (list): training samples
n (int): number of output lists
"""
return [arr[i::n] for i in range(n)]
def get_server_metrics(server_rrefs):
r"""
A function that calls the remote server to obtain metrics
collected during the benchmark run.
Args:
server_rrefs (dict): a dictionary containing server RRefs
"""
rank_metrics = []
for rank, server_rref in server_rrefs.items():
metrics = server_rref.rpc_sync().get_metrics(server_rref)
rank_metrics.append([rank, metrics])
return rank_metrics
def run_master(rank, data, args, extra_configs, rpc_backend_options):
r"""
A function that runs the master process in the world. This function
obtains remote references to initialized servers, splits the data,
runs the trainers, and prints metrics.
Args:
rank (int): process number in the world
data (list): training samples
args (parser): benchmark configurations
extra_configs (dict): configurations added by the user
rpc_backend_options (rpc): configurations/options for the rpc TODO: fix
"""
world_size = args.ntrainer + args.ncudatrainer + args.nserver + args.ncudaserver + 1
rpc.init_rpc(
get_name(rank, args),
rank=rank,
world_size=world_size,
rpc_backend_options=rpc_backend_options,
)
server_rrefs = {}
for i in range(args.ntrainer + args.ncudatrainer, world_size - 1):
server_rrefs[i] = get_server_rref(i, args, extra_configs["server_config"])
train_data = split_list(
list(DataLoader(data, batch_size=args.batch_size)),
args.ntrainer + args.ncudatrainer,
)
# warmup run the benchmark
benchmark_warmup(args, extra_configs["trainer_config"], train_data, server_rrefs)
# run the benchmark
trainer_futs = call_trainers(
args, extra_configs["trainer_config"], train_data, server_rrefs
)
# collect metrics and print
metrics_printer = ProcessedMetricsPrinter()
rank_metrics_list = wait_all(trainer_futs)
metrics_printer.print_metrics("trainer", rank_metrics_list)
rank_metrics_list = get_server_metrics(server_rrefs)
metrics_printer.print_metrics("server", rank_metrics_list)
def run_benchmark(rank, args, data):
r"""
A function that runs the benchmark.
Args:
rank (int): process number in the world
args (parser): configuration args
data (list): training samples
"""
config = load_extra_configs(args)
torch.manual_seed(args.torch_seed)
torch.cuda.manual_seed_all(args.cuda_seed)
torch.backends.cudnn.benchmark = True
torch.backends.cudnn.deterministic = True
world_size = args.ntrainer + args.ncudatrainer + args.nserver + args.ncudaserver + 1
os.environ["MASTER_ADDR"] = args.master_addr
os.environ["MASTER_PORT"] = args.master_port
rpc_backend_options = TensorPipeRpcBackendOptions(rpc_timeout=args.rpc_timeout)
if rank == world_size - 1:
# master = [ntrainer + ncudatrainer + nserver + ncudaserver, ntrainer + ncudatrainer + nserver + ncudaserver]
run_master(rank, data, args, config, rpc_backend_options)
elif rank >= args.ntrainer + args.ncudatrainer:
# parameter_servers = [ntrainer + ncudatrainer, ntrainer + ncudatrainer + nserver + ncudaserver)
rpc.init_rpc(
get_name(rank, args),
rank=rank,
world_size=world_size,
rpc_backend_options=rpc_backend_options,
)
else:
# trainers = [0, ntrainer + ncudatrainer)
if rank >= args.ntrainer:
server_rank = get_cuda_server_rank(args, rank)
server_name = get_name(server_rank, args)
rpc_backend_options.set_device_map(server_name, {rank: server_rank})
trainer_name = get_name(rank, args)
rpc.init_rpc(
trainer_name,
rank=rank,
world_size=world_size,
rpc_backend_options=rpc_backend_options,
)
rpc.shutdown()
def get_json_config(file_name: str, id: str):
r"""
A function that loads a json configuration from a file.
Args:
file_name (str): name of configuration file to load
id (str): configuration that will be loaded
"""
with open(Path(__file__).parent / file_name) as f:
json_config = json.load(f)[id]
return json_config
def load_extra_configs(args):
r"""
A function that creates a dictionary that contains any extra configurations
set by the user. The dictionary will contain two keys trainer_config and
server_config, with default values None.
Args:
args (parser): launcher configurations
"""
trainer_config_file = args.trainer_config_path
server_config_file = args.server_config_path
configurations = {"trainer_config": None, "server_config": None}
if args.trainer is not None and trainer_config_file is not None:
configurations["trainer_config"] = get_json_config(
trainer_config_file, args.trainer
)
if args.server is not None and server_config_file is not None:
configurations["server_config"] = get_json_config(
server_config_file, args.server
)
return configurations
def load_data(args):
r"""
A function that creates an instance of the data class.
Args:
args (parser): launcher configurations
"""
data_config_file = args.data_config_path
data_config = get_json_config(data_config_file, args.data)
data_class = data_map[data_config["data_class"]]
return data_class(**data_config["configurations"])
def load_model(args):
r"""
A function that creates an instance of the model class.
Args:
args (parser): launcher configurations
"""
model_config_file = args.model_config_path
model_config = get_json_config(model_config_file, args.model)
model_class = model_map[model_config["model_class"]]
return model_class(**model_config["configurations"])
def main(args):
r"""
A function that creates multiple processes to run the benchmark.
Args:
args (parser): launcher configurations
"""
# CPU and RPC trainer checks
if args.ntrainer > 0 and args.ncudatrainer > 0:
assert args.nserver > 0 and args.ncudaserver > 0
if args.nserver > 0:
assert args.ntrainer > 0
assert args.ntrainer % args.nserver == 0
if args.ncudaserver > 0:
assert args.ncudatrainer > 0
assert args.ncudatrainer % args.ncudaserver == 0
world_size = args.ntrainer + args.ncudatrainer + args.nserver + args.ncudaserver + 1
data = load_data(args)
mp.spawn(
run_benchmark,
args=(
args,
data,
),
nprocs=world_size,
join=True,
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="RPC server Benchmark")
parser.add_argument(
"--master-addr",
"--master_addr",
type=str,
help="IP address of the machine that will host the process with rank 0",
)
parser.add_argument(
"--master-port",
"--master_port",
type=str,
help="A free port on the machine that will host the process with rank 0",
)
parser.add_argument(
"--trainer",
type=str,
help="trainer map key to get trainer class for benchmark run",
)
parser.add_argument("--ntrainer", type=int, help="trainer count for benchmark run")
parser.add_argument(
"--ncudatrainer", type=int, help="cudatrainer count for benchmark run"
)
parser.add_argument(
"--filestore", type=str, help="filestore location for process group"
)
parser.add_argument(
"--server",
type=str,
help="server map key to get trainer class for benchmark run",
)
parser.add_argument("--nserver", type=int, help="server count for benchmark run")
parser.add_argument(
"--ncudaserver", type=int, help="cudaserver count for benchmark run"
)
parser.add_argument(
"--rpc-timeout",
"--rpc_timeout",
type=int,
help="timeout in seconds to use for RPC",
)
parser.add_argument(
"--backend",
type=str,
help="distributed communication backend to use for benchmark run",
)
parser.add_argument("--epochs", type=int, help="epoch count for training")
parser.add_argument(
"--batch-size",
"--batch_size",
type=int,
help="number of training examples used in one iteration",
)
parser.add_argument("--data", type=str, help="id for data configuration")
parser.add_argument("--model", type=str, help="id for model configuration")
parser.add_argument(
"--data-config-path",
"--data_config_path",
type=str,
help="path to data configuration file",
)
parser.add_argument(
"--model-config-path",
"--model_config_path",
type=str,
help="path to model configuration file",
)
parser.add_argument(
"--server-config-path",
"--server_config_path",
type=str,
help="path to server configuration file",
)
parser.add_argument(
"--trainer-config-path",
"--trainer_config_path",
type=str,
help="path to trainer configuration file",
)
parser.add_argument(
"--torch-seed",
"--torch_seed",
type=int,
help="seed for generating random numbers to a non-deterministic random number",
)
parser.add_argument(
"--cuda-seed",
"--cuda_seed",
type=int,
help="seed for generating random numbers to a random number for the current GPU",
)
parser.add_argument(
"--preprocess-data",
"--preprocess_data",
type=str,
help="this function will be used to preprocess data before training",
)
parser.add_argument(
"--create-criterion",
"--create_criterion",
type=str,
help="this function will be used to create the criterion used for model loss calculation",
)
parser.add_argument(
"--create-ddp-model",
"--create_ddp_model",
type=str,
help="this function will be used to create the ddp model used during training",
)
parser.add_argument(
"--hook-state",
"--hook_state",
type=str,
help="this will be the state class used when registering the ddp communication hook",
)
parser.add_argument(
"--ddp-hook",
"--ddp_hook",
type=str,
default="allreduce_hook",
help="ddp communication hook",
)
parser.add_argument(
"--iteration-step",
"--iteration_step",
type=str,
help="this will be the function called for each iteration of training",
)
args = parser.parse_args()
print(f"{args}\n")
main(args)

View File

@ -1,23 +0,0 @@
import time
from .MetricBase import MetricBase
class CPUMetric(MetricBase):
def __init__(self, name: str):
self.name = name
self.start = None
self.end = None
def record_start(self):
self.start = time.time()
def record_end(self):
self.end = time.time()
def elapsed_time(self):
if self.start is None:
raise RuntimeError("start is None")
if self.end is None:
raise RuntimeError("end is None")
return self.end - self.start

View File

@ -1,32 +0,0 @@
import torch
from .MetricBase import MetricBase
class CUDAMetric(MetricBase):
def __init__(self, rank: int, name: str):
self.rank = rank
self.name = name
self.start = None
self.end = None
def record_start(self):
self.start = torch.cuda.Event(enable_timing=True)
with torch.cuda.device(self.rank):
self.start.record()
def record_end(self):
self.end = torch.cuda.Event(enable_timing=True)
with torch.cuda.device(self.rank):
self.end.record()
def elapsed_time(self):
if not self.start.query():
raise RuntimeError("start event did not complete")
if not self.end.query():
raise RuntimeError("end event did not complete")
return self.start.elapsed_time(self.end)
def synchronize(self):
self.start.synchronize()
self.end.synchronize()

View File

@ -1,26 +0,0 @@
from abc import ABC, abstractmethod
class MetricBase(ABC):
def __init__(self, name):
self.name = name
self.start = None
self.end = None
@abstractmethod
def record_start(self):
return
@abstractmethod
def record_end(self):
return
@abstractmethod
def elapsed_time(self):
return
def get_name(self):
return self.name
def get_end(self):
return self.end

View File

@ -1,79 +0,0 @@
from .CPUMetric import CPUMetric
from .CUDAMetric import CUDAMetric
class MetricsLogger:
def __init__(self, rank=None):
self.rank = rank
self.metrics = {}
def record_start(self, type, key, name, cuda):
if type in self.metrics and key in self.metrics[type]:
raise RuntimeError(f"metric_type={type} with key={key} already exists")
if cuda:
if self.rank is None:
raise RuntimeError("rank is required for cuda")
metric = CUDAMetric(self.rank, name)
else:
metric = CPUMetric(name)
if type not in self.metrics:
self.metrics[type] = {}
self.metrics[type][key] = metric
metric.record_start()
def record_end(self, type, key):
if type not in self.metrics or key not in self.metrics[type]:
raise RuntimeError(f"metric_type={type} with key={key} not found")
if self.metrics[type][key].get_end() is not None:
raise RuntimeError(
f"end for metric_type={type} with key={key} already exists"
)
self.metrics[type][key].record_end()
def clear_metrics(self):
self.metrics.clear()
def get_metrics(self):
return self.metrics
def get_processed_metrics(self):
r"""
A method that processes the metrics recorded during the benchmark.
Returns::
It returns a dictionary containing keys as the metrics
and values list of elapsed times.
Examples::
>>> instance = MetricsLogger(rank)
>>> instance.cuda_record_start("forward_metric_type", "1", "forward_pass")
>>> instance.cuda_record_end("forward_metric_type", "1")
>>> instance.cuda_record_start("forward_metric_type", "2", "forward_pass")
>>> instance.cuda_record_end("forward_metric_type", "2")
>>> print(instance.metrics)
{
"forward_metric_type": {
"1": metric1,
"2": metric2
}
}
>>> print(instance.get_processed_metrics())
{
"forward_metric_type,forward_pass" : [.0429, .0888]
}
"""
processed_metrics = {}
for metric_type in self.metrics.keys():
for metric_key in self.metrics[metric_type].keys():
metric = self.metrics[metric_type][metric_key]
if isinstance(metric, CUDAMetric):
metric.synchronize()
metric_name = metric.get_name()
elapsed_time = metric.elapsed_time()
processed_metric_name = f"{metric_type},{metric_name}"
if processed_metric_name not in processed_metrics:
processed_metrics[processed_metric_name] = []
processed_metrics[processed_metric_name].append(elapsed_time)
return processed_metrics

View File

@ -1,83 +0,0 @@
import statistics
import pandas as pd
from tabulate import tabulate
class ProcessedMetricsPrinter:
def print_data_frame(self, name, processed_metrics):
print(f"metrics for {name}")
data_frame = self.get_data_frame(processed_metrics)
print(
tabulate(
data_frame, showindex=False, headers=data_frame.columns, tablefmt="grid"
)
)
def combine_processed_metrics(self, processed_metrics_list):
r"""
A method that merges the value arrays of the keys in the dictionary
of processed metrics.
Args:
processed_metrics_list (list): a list containing dictionaries with
recorded metrics as keys, and the values are lists of elapsed times.
Returns::
A merged dictionary that is created from the list of dictionaries passed
into the method.
Examples::
>>> instance = ProcessedMetricsPrinter()
>>> dict_1 = trainer1.get_processed_metrics()
>>> dict_2 = trainer2.get_processed_metrics()
>>> print(dict_1)
{
"forward_metric_type,forward_pass" : [.0429, .0888]
}
>>> print(dict_2)
{
"forward_metric_type,forward_pass" : [.0111, .0222]
}
>>> processed_metrics_list = [dict_1, dict_2]
>>> result = instance.combine_processed_metrics(processed_metrics_list)
>>> print(result)
{
"forward_metric_type,forward_pass" : [.0429, .0888, .0111, .0222]
}
"""
processed_metric_totals = {}
for processed_metrics in processed_metrics_list:
for metric_name, values in processed_metrics.items():
if metric_name not in processed_metric_totals:
processed_metric_totals[metric_name] = []
processed_metric_totals[metric_name] += values
return processed_metric_totals
def get_data_frame(self, processed_metrics):
df = pd.DataFrame(columns=["name", "min", "max", "mean", "variance", "stdev"])
for metric_name in sorted(processed_metrics.keys()):
values = processed_metrics[metric_name]
row = {
"name": metric_name,
"min": min(values),
"max": max(values),
"mean": statistics.mean(values),
"variance": statistics.variance(values),
"stdev": statistics.stdev(values),
}
df = df.append(row, ignore_index=True)
return df
def print_metrics(self, name, rank_metrics_list):
if rank_metrics_list:
metrics_list = []
for rank, metric in rank_metrics_list:
self.print_data_frame(f"{name}={rank}", metric)
metrics_list.append(metric)
combined_metrics = self.combine_processed_metrics(metrics_list)
self.print_data_frame(f"all {name}", combined_metrics)
def save_to_file(self, data_frame, file_name):
file_name = f"data_frames/{file_name}.csv"
data_frame.to_csv(file_name, encoding="utf-8", index=False)

View File

@ -1,36 +0,0 @@
import torch.nn as nn
import torch.nn.functional as F
class DummyModel(nn.Module):
def __init__(
self,
num_embeddings: int,
embedding_dim: int,
dense_input_size: int,
dense_output_size: int,
dense_layers_count: int,
sparse: bool,
):
r"""
A dummy model with an EmbeddingBag Layer and Dense Layer.
Args:
num_embeddings (int): size of the dictionary of embeddings
embedding_dim (int): the size of each embedding vector
dense_input_size (int): size of each input sample
dense_output_size (int): size of each output sample
dense_layers_count: (int): number of dense layers in dense Sequential module
sparse (bool): if True, gradient w.r.t. weight matrix will be a sparse tensor
"""
super().__init__()
self.embedding = nn.EmbeddingBag(num_embeddings, embedding_dim, sparse=sparse)
self.dense = nn.Sequential(
*[
nn.Linear(dense_input_size, dense_output_size)
for _ in range(dense_layers_count)
]
)
def forward(self, x):
x = self.embedding(x)
return F.softmax(self.dense(x), dim=1)

View File

@ -1,4 +0,0 @@
from .DummyModel import DummyModel
model_map = {"DummyModel": DummyModel}

View File

@ -1,7 +0,0 @@
from .server import AverageBatchParameterServer, AverageParameterServer
server_map = {
"AverageParameterServer": AverageParameterServer,
"AverageBatchParameterServer": AverageBatchParameterServer,
}

View File

@ -1,331 +0,0 @@
import functools
import threading
import time
from abc import ABC, abstractmethod
from metrics.MetricsLogger import MetricsLogger
from utils import sparse_rpc_format_to_tensor, sparse_tensor_to_rpc_format
import torch
import torch.distributed.rpc as rpc
class ParameterServerBase(ABC):
PARAMETER_SERVER_BATCH_METRIC = "parameter_server_batch_metric"
PARAMETER_SERVER_STRAGGLER_METRIC = "parameter_server_straggler_metric"
PARAM_INDEX_STRAGGLER = "param_index_straggler"
PARAM_INDEX_BATCH = "param_index_batch"
def __init__(self, rank):
r"""
Inits ParameterServerBase class.
Args:
rank (int): worker rank
"""
self.__metrics_logger = MetricsLogger(rank)
@abstractmethod
def process_gradient(self):
r"""
A method to be implemented by child class that will process a
gradient received by a server.
"""
return
@staticmethod
@abstractmethod
def average_gradient():
r"""
A method to be implemented by child class that will average
gradients.
"""
return
@staticmethod
@abstractmethod
def reset_state():
r"""
A method to be implemented by child class that will reset
the server state.
"""
return
def record_start(self, type, key, name, cuda=True):
r"""
A method that records the start event for a metric.
Args:
type (str): group id for metric
key (str): unique id for metric within a group
name (str): description of the metric
cuda (bool): indicator to determine if this is a CUDA metric
"""
self.__metrics_logger.record_start(type, key, name, cuda)
def record_end(self, type, key):
r"""
A method that records the end event for a metric
Args:
type (str): group id for metric
key (str): unique id for metric within a group
"""
self.__metrics_logger.record_end(type, key)
def record_straggler_start(self, key, cuda=True):
r"""
A helper method that records a straggler metric
for the given key. A user should call this when
the first gradient for the param location is received.
Args:
key (str): unique id for metric within a group
cuda (bool): indicator to determine if this is a CUDA metric
"""
self.__metrics_logger.record_start(
self.PARAMETER_SERVER_STRAGGLER_METRIC,
key,
self.PARAM_INDEX_STRAGGLER,
cuda,
)
def record_straggler_end(self, key):
r"""
A helper method that records a straggler metric
for the given key. A user should call this when
the last gradient for the param location is received.
Args:
key (str): unique id for metric within a group
"""
self.__metrics_logger.record_end(self.PARAMETER_SERVER_STRAGGLER_METRIC, key)
def record_batch_start(self, key, cuda=True):
r"""
A helper method that records a batch metric
for the given key. A user should call this when
the first gradient for the param location is received.
Args:
key (str): unique id for metric within a group
cuda (bool): indicator to determine if this is a CUDA metric
"""
self.__metrics_logger.record_start(
self.PARAMETER_SERVER_BATCH_METRIC, key, self.PARAM_INDEX_BATCH, cuda
)
def record_batch_end(self, key):
r"""
A helper method that records a batch metric
for the given key. A user should call this when
all futures for a param location have had their
result set.
Args:
key (str): unique id for metric within a group
"""
self.__metrics_logger.record_end(self.PARAMETER_SERVER_BATCH_METRIC, key)
@staticmethod
def record_method(name, type="method_metric", cuda=True):
r"""
A decorator that records a metric for the decorated method.
Args:
name (str): description of the metric
type (str): group id for metric
cuda (bool): indicator to determine if this is a CUDA metric
"""
def decorator(function):
@functools.wraps(function)
def wrapper(self, *args):
key = time.time()
self.__metrics_logger.record_start(type, key, name, cuda)
result = function(self, *args)
self.__metrics_logger.record_end(type, key)
return result
return wrapper
return decorator
@staticmethod
def get_metrics(server_rref):
r"""
A staticmethod that returns metrics captured by the __metrics_logger.
Args:
server_rref (RRef): remote reference to the server
"""
self = server_rref.local_value()
return self.__metrics_logger.get_processed_metrics()
def clear_metrics(self):
r"""
A method that clears __metrics_logger recorded metrics.
"""
return self.__metrics_logger.clear_metrics()
class AverageParameterServer(ParameterServerBase):
def __init__(self, rank, trainer_count, use_cuda_rpc):
r"""
A parameter server that averages the gradients
from trainers for each training iteration step.
Gradients are added as they are received from trainers.
When all gradients have been received, the sum is
divided by the number of trainers.
Args:
rank (int): worker rank
trainer_count (int): count of trainers sending
gradients to the server
use_cuda_rpc (bool): indicator for CUDA RPC
"""
super().__init__(rank)
self.lock = threading.Lock()
self.rank = rank
self.trainer_count = trainer_count
self.use_cuda_rpc = use_cuda_rpc
self.batch_number = 0
self.futures = {}
self.gradient_dict = {}
@staticmethod
def reset_state(server_rref):
r"""
A method that clears the state of the server.
Args:
server_rref (RRef): remote reference to the server
"""
self = server_rref.local_value()
self.batch_number = 0
self.futures.clear()
self.gradient_dict.clear()
self.clear_metrics()
def param_key(self, param_loc):
r"""
A method that returns an encoded key that represents
the current batch and param location.
Args:
param_loc (int): bucket location sent by the trainer
containing the gradient
"""
return f"{self.batch_number},{param_loc}"
def clear_batch_state(self):
r"""
Clears the current server batch state.
"""
self.futures.clear()
self.gradient_dict.clear()
def process_gradient(self, gradient, param_loc):
r"""
Stores the gradient if param_loc is not in gradient_dict.
Adds the gradient to param_loc if it is in gradient_dict.
Args:
gradient (torch.Tensor): tensor sent from trainer
param_loc (int): bucket location sent by the trainer
containing the gradient
"""
if param_loc not in self.gradient_dict:
self.record_straggler_start(self.param_key(param_loc))
self.record_batch_start(self.param_key(param_loc))
self.gradient_dict[param_loc] = gradient
else:
self.gradient_dict[param_loc] += gradient
@ParameterServerBase.record_method(name="average computation")
def average(self, param_loc):
r"""
Obtains the tensor at the param_loc in the gradient_dict
and then divides by number of trainers.
Args:
param_loc (int): bucket location sent by the trainer
containing the gradient
"""
param_loc_avg = self.gradient_dict[param_loc]
param_loc_avg / (1.0 * self.trainer_count)
return param_loc_avg
@staticmethod
@rpc.functions.async_execution
def average_gradient(server_rref, received_batch_number, param_loc, gradient):
r"""
An asynchronous function that will average gradients
sent from trainers.
Args:
server_rref (RRef): remote reference to the server
received_batch_number (int): batch number sent by
the trainer
param_loc (int): bucket location sent by the trainer
containing the gradient
gradient (torch.Tensor or list): tensor sent by the trainer
"""
self = server_rref.local_value()
if type(gradient) is list:
gradient = sparse_rpc_format_to_tensor(gradient)
gradient = gradient.cuda(self.rank)
fut = torch.futures.Future()
with self.lock:
if self.batch_number < received_batch_number:
self.batch_number = received_batch_number
self.clear_batch_state()
self.process_gradient(gradient, param_loc)
if param_loc not in self.futures:
self.futures[param_loc] = []
self.futures[param_loc].append(fut)
if len(self.futures[param_loc]) == self.trainer_count:
self.record_straggler_end(self.param_key(param_loc))
param_loc_avg = self.average(param_loc)
if not self.use_cuda_rpc:
param_loc_avg = param_loc_avg.cpu()
if param_loc_avg.is_sparse:
param_loc_avg = sparse_tensor_to_rpc_format(param_loc_avg)
for cur_fut in self.futures[param_loc]:
cur_fut.set_result(param_loc_avg)
self.record_batch_end(self.param_key(param_loc))
return fut
class AverageBatchParameterServer(AverageParameterServer):
def __init__(self, rank, trainer_count, use_cuda_rpc):
r"""
A parameter server that averages the gradients
from trainers for each training iteration step.
Gradients are stored and averaged when a gradient
has been received from each trainer for a param
location.
Args:
rank (int): worker rank
trainer_count (int): count of trainers sending
gradients to the server
use_cuda_rpc (bool): indicator for CUDA RPC
"""
super().__init__(rank, trainer_count, use_cuda_rpc)
def process_gradient(self, gradient, param_loc):
r"""
Adds the gradient to param_loc bucket stored in
the gradient_dict.
Args:
gradient (torch.Tensor): tensor sent from trainer
param_loc (int): bucket location sent by the trainer
containing the gradient
"""
if param_loc not in self.gradient_dict:
self.record_straggler_start(self.param_key(param_loc))
self.record_batch_start(self.param_key(param_loc))
self.gradient_dict[param_loc] = []
self.gradient_dict[param_loc].append(gradient)
@ParameterServerBase.record_method(name="average computation")
def average(self, param_loc):
r"""
Sums the gradients at the param_loc then divides by the
number of trainers.
Args:
param_loc (int): bucket location sent by the trainer
containing the gradient
"""
param_loc_avg = self.gradient_dict[param_loc][0]
for gradient in self.gradient_dict[param_loc][1:]:
param_loc_avg += gradient
param_loc_avg / (1.0 * self.trainer_count)
return param_loc_avg

View File

@ -1,27 +0,0 @@
from .criterions import cel
from .ddp_models import basic_ddp_model
from .hook_states import BasicHookState
from .hooks import allreduce_hook, hybrid_hook, rpc_hook, sparse_rpc_hook
from .iteration_steps import basic_iteration_step
from .preprocess_data import preprocess_dummy_data
from .trainer import DdpTrainer
criterion_map = {"cel": cel}
ddp_hook_map = {
"allreduce_hook": allreduce_hook,
"hybrid_hook": hybrid_hook,
"rpc_hook": rpc_hook,
"sparse_rpc_hook": sparse_rpc_hook,
}
ddp_model_map = {"basic_ddp_model": basic_ddp_model}
iteration_step_map = {"basic_iteration_step": basic_iteration_step}
preprocess_data_map = {"preprocess_dummy_data": preprocess_dummy_data}
hook_state_map = {"BasicHookState": BasicHookState}
trainer_map = {"DdpTrainer": DdpTrainer}

View File

@ -1,10 +0,0 @@
import torch.nn as nn
def cel(rank):
r"""A function that creates a CrossEntropyLoss
criterion for training.
Args:
rank (int): worker rank
"""
return nn.CrossEntropyLoss().cuda(rank)

View File

@ -1,21 +0,0 @@
from torch.nn.parallel import DistributedDataParallel as DDP
def basic_ddp_model(self, rank, model, process_group, hook_state, hook):
r"""
A function that creates a ddp_model and hook_state objects.
The ddp model is initialized with a single device id and
the process group. The ddp_model also registers the communication
hook.
Args:
rank (int): worker rank
model (nn.Module): neural network model
process_group (ProcessGroup): distributed process group
hook_state (class): class that will be used to keep track of state
during training.
hook (function): ddp communication hook
"""
ddp_model = DDP(model, device_ids=[rank], process_group=process_group)
hook_state = hook_state(self, process_group)
ddp_model.register_comm_hook(hook_state, hook)
return ddp_model, hook_state

View File

@ -1,27 +0,0 @@
class BasicHookState:
def __init__(self, cref, process_group):
r"""
A class that holds state information that is needed by the communication hook
during the training algorithm.
Args:
cref (DdpTrainer): reference to the self keyword of the trainer instance
process_group (ProcessGroup): distributed process group
"""
self.cref = cref
self.process_group = process_group
self.batch_number = -1
def get_key(self, bucket_index):
r"""
A method that returns an encoded key that represents the current batch and
bucket index.
Args:
bucket_index (int): index of the bucket being processed in backward
"""
return f"{self.batch_number},{bucket_index}"
def next_batch(self):
r"""
A method that increments batch_number by 1.
"""
self.batch_number += 1

View File

@ -1,100 +0,0 @@
from utils import process_bucket_with_remote_server
import torch
import torch.distributed as c10d
def allreduce_hook(state, bucket):
r"""
A ddp communication hook that uses the process_group allreduce implementation.
Args:
state (object): maintains state during the training process
bucket (GradBucket): gradient bucket
"""
cref = state.cref
tensor = bucket.buffer()
tensors = [tensor / state.process_group.size()]
key = state.get_key(bucket.get_index())
if tensor.is_sparse:
tensor = tensor.coalesce()
tensor_type = "sparse" if tensor.is_sparse else "dense"
cref.record_start(
"hook_future_metric", key, f"{cref.backend}_{tensor_type}_allreduce"
)
fut = state.process_group.allreduce(tensors).get_future()
def callback(fut):
cref.record_end("hook_future_metric", key)
return fut.wait()
return fut.then(callback)
def hybrid_hook(state, bucket):
r"""
A ddp communication hook that uses Gloo default process
group for sparse gradients and NCCL non-default process
group for dense gradients.
Args:
state (object): maintains state during the training process
bucket (GradBucket): gradient bucket
"""
cref = state.cref
tensor = bucket.buffer()
key = state.get_key(bucket.get_index())
if tensor.is_sparse:
cref.record_start("hook_c10d_metric", key, "gloo_sparse_allreduce")
tensor = tensor.coalesce()
tensor = tensor / state.process_group.size()
c10d.all_reduce(tensor, op=c10d.ReduceOp.SUM)
cref.record_end("hook_c10d_metric", key)
fut = torch.futures.Future()
fut.set_result([tensor])
else:
cref.record_start("hook_future_metric", key, "nccl_dense_allreduce")
tensors = [bucket.buffer() / state.process_group.size()]
fut = state.process_group.allreduce(tensors).get_future()
def callback(fut):
cref.record_end("hook_future_metric", key)
return fut.wait()
fut = fut.then(callback)
return fut
def rpc_hook(state, bucket):
r"""
A ddp communication hook that averages sparse and dense tensors using
process_bucket_with_remote_server method.
Args:
state (object): maintains state during the training process
bucket (GradBucket): gradient bucket
"""
return process_bucket_with_remote_server(state, bucket)
def sparse_rpc_hook(state, bucket):
r"""
A ddp communication hook that uses the current backend allreduce
implementation for dense tensors and a server for sparse tensors.
Args:
state (object): maintains state during the training process
bucket (GradBucket): gradient bucket
"""
tensor = bucket.buffer()
if tensor.is_sparse:
return process_bucket_with_remote_server(state, bucket)
else:
cref = state.cref
tensor = [tensor / state.process_group.size()]
key = state.get_key(bucket.get_index())
cref.record_start("hook_future_metric", key, f"{cref.backend}_dense_allreduce")
fut = state.process_group.allreduce(tensor).get_future()
def callback(fut):
cref.record_end("hook_future_metric", key)
return fut.wait()
return fut.then(callback)

View File

@ -1,25 +0,0 @@
def basic_iteration_step(
self, ddp_model, criterion, optimizer, hook_state, epoch, index, batch
):
r"""
A function that performs an iteration of training.
Args:
ddp_model (nn.Module): distributed data parallel model
criterion (nn.Module): loss function to measure model
optimizer (optim.Optimizer): updates model parameters
hook_state (object): ddp communication hook state object
epoch (int): index of pass through the data
index (int): iteration number - 1 in current batch
batch (list): training examples
"""
hook_state.next_batch()
self.record_batch_start(self.epoch_key(epoch, index))
optimizer.zero_grad()
self.record_forward_start(self.epoch_key(epoch, index))
loss = criterion(ddp_model(batch[0]), batch[1])
self.record_forward_end(self.epoch_key(epoch, index))
self.record_backward_start(self.epoch_key(epoch, index))
loss.backward()
self.record_backward_end(self.epoch_key(epoch, index))
optimizer.step()
self.record_batch_end(self.epoch_key(epoch, index))

View File

@ -1,12 +0,0 @@
def preprocess_dummy_data(rank, data):
r"""
A function that moves the data from CPU to GPU
for DummyData class.
Args:
rank (int): worker rank
data (list): training examples
"""
for i in range(len(data)):
data[i][0] = data[i][0].cuda(rank)
data[i][1] = data[i][1].cuda(rank)
return data

View File

@ -1,246 +0,0 @@
import functools
import time
from abc import ABC, abstractmethod
from metrics.MetricsLogger import MetricsLogger
import torch
class TrainerBase(ABC):
BATCH_LEVEL_METRIC = "batch_level_metric"
BATCH_ALL = "batch_all"
FORWARD_METRIC = "forward_metric"
FORWARD_PASS = "forward_pass"
BACKWARD_METRIC = "backward_metric"
BACKWARD = "backward"
def __init__(self, rank):
r"""
Inits TrainerBase class.
Args:
rank (int): worker rank
"""
self.__metrics_logger = MetricsLogger(rank)
@abstractmethod
def train(self):
r"""
A method to be implemented by child class that will train a neural network.
"""
return
def record_start(self, type, key, name, cuda=True):
r"""
A method that records the start event for a metric.
Args:
type (str): group id for metric
key (str): unique id for metric within a group
name (str): description of the metric
cuda (bool): indicator to determine if this is a CUDA metric
"""
self.__metrics_logger.record_start(type, key, name, cuda)
def record_end(self, type, key):
r"""
A method that records the end event for a metric.
Args:
type (str): group id for metric
key (str): unique id for metric within a group
"""
self.__metrics_logger.record_end(type, key)
def record_batch_start(self, key, cuda=True):
r"""
A helper method that records a batch metric for the
given key. A user should call this at the start of an
iteration step during training.
Args:
key (str): unique id for metric within a group
cuda (bool): indicator to determine if this is a CUDA metric
"""
self.__metrics_logger.record_start(
self.BATCH_LEVEL_METRIC, key, self.BATCH_ALL, cuda
)
def record_batch_end(self, key):
r"""
A helper method that records a batch metric for the
given key. A user should call this at the end of an
iteration step during training.
Args:
key (str): unique id for metric within a group
"""
self.__metrics_logger.record_end(self.BATCH_LEVEL_METRIC, key)
def record_forward_start(self, key, cuda=True):
r"""
A helper method that records a forward metric
for the given key. A user should call this before
their neural network forward.
Args:
key (str): unique id for metric within a group
cuda (bool): indicator to determine if this is a CUDA metric
"""
self.__metrics_logger.record_start(
self.FORWARD_METRIC, key, self.FORWARD_PASS, cuda
)
def record_forward_end(self, key):
r"""
A helper method that records a forward metric
for the given key. A user should call this after their
neural network forward.
Args:
key (str): unique id for metric within a group
"""
self.__metrics_logger.record_end(self.FORWARD_METRIC, key)
def record_backward_start(self, key, cuda=True):
r"""
A helper method that records a backward metric
for the given key. A user should call this before
their .backward() call.
Args:
key (str): unique id for metric within a group
cuda (bool): indicator to determine if this is a CUDA metric
"""
self.__metrics_logger.record_start(
self.BACKWARD_METRIC, key, self.BACKWARD, cuda
)
def record_backward_end(self, key):
r"""
A helper method that records a backward metric
for the given key. A user should call this after
.backward().
Args:
key (str): unique id for metric within a group
"""
self.__metrics_logger.record_end(self.BACKWARD_METRIC, key)
@staticmethod
def methodmetric(name, type="method_metric", cuda=True):
r"""
A decorator that records a metric for the decorated method.
Args:
name (str): description of the metric
type (str): group id for metric
cuda (bool): indicator to determine if this is a CUDA metric
"""
def decorator(function):
@functools.wraps(function)
def wrapper(self, *args):
key = time.time()
self.__metrics_logger.record_start(type, key, name, cuda)
result = function(self, *args)
self.__metrics_logger.record_end(type, key)
return result
return wrapper
return decorator
def get_metrics(self):
r"""
A method that returns metrics captured by the __metrics_logger.
"""
return self.__metrics_logger.get_processed_metrics()
def clear_metrics(self):
r"""
A method that clears __metrics_logger recorded metrics.
"""
return self.__metrics_logger.clear_metrics()
class DdpTrainer(TrainerBase):
def __init__(
self,
process_group,
use_cuda_rpc,
server_rref,
backend,
epochs,
preprocess_data,
create_criterion,
create_ddp_model,
hook_state_class,
hook,
iteration_step,
):
r"""
A trainer that implements a DDP training algorithm using a simple hook that performs allreduce
using the process_group implementation.
Args:
process_group (ProcessGroup): distributed process group
use_cuda_rpc (bool): indicator for CUDA RPC
server_rref (RRef): remote reference to the server
backend (str): distributed communication backend
epochs (int): epoch count for training
preprocess_data (function): preprocesses data passed
to the trainer before starting training
create_criterion (function): creates a criterion to calculate loss
create_ddp_model (function): creates a ddp model for the trainer
hook_state_class (class): class that will be used to keep tracking of state
during training.
hook (function): ddp communication hook
iteration_step (function): will perform 1 step of training
"""
super().__init__(process_group.rank())
self.process_group = process_group
self.use_cuda_rpc = use_cuda_rpc
self.server_rref = server_rref
self.backend = backend
self.epochs = epochs
self.preprocess_data = preprocess_data
self.create_criterion = create_criterion
self.create_ddp_model = create_ddp_model
self.hook_state_class = hook_state_class
self.hook = hook
self.iteration_step = iteration_step
self.rank = process_group.rank()
self.trainer_count = process_group.size()
def epoch_key(self, epoch, index):
r"""
A method that returns an encoded key that represents the current epoch and
iteration index.
Args:
epoch (int): epoch index
index (int): iteration index
"""
return f"{epoch},{index}"
def train(self, model, data):
r"""
A method that implements the training algorithm.
Args:
model (nn.Module): neural network model
data (list): training examples
"""
model = model.cuda(self.rank)
data = self.preprocess_data(self.rank, data)
criterion = self.create_criterion(self.rank)
ddp_model, hook_state = self.create_ddp_model(
self, self.rank, model, self.process_group, self.hook_state_class, self.hook
)
optimizer = torch.optim.SGD(ddp_model.parameters(), 1e-4)
for epoch in range(self.epochs):
if epoch % 5 == 0 and self.rank == 0:
print(f"train epoch={epoch}")
for index, batch in enumerate(data):
self.iteration_step(
self,
ddp_model,
criterion,
optimizer,
hook_state,
epoch,
index,
batch,
)
torch.cuda.synchronize(self.rank)

View File

@ -1,60 +0,0 @@
import torch
RPC_SPARSE = "rpc_sparse"
RPC_DENSE = "rpc_dense"
def sparse_tensor_to_rpc_format(sparse_tensor):
r"""
A helper function creates a list containing the indices, values, and size
of a coalesced sparse tensor.
Args:
sparse_tensor (torch.Tensor): sparse_coo_tensor represented as a list
"""
sparse_tensor = sparse_tensor.coalesce()
return [sparse_tensor.indices(), sparse_tensor.values(), sparse_tensor.size()]
def sparse_rpc_format_to_tensor(sparse_rpc_format):
r"""
A helper function creates a sparse_coo_tensor from indices, values, and size.
Args:
sparse_rpc_format (list): sparse_coo_tensor represented as a list
"""
return torch.sparse_coo_tensor(
sparse_rpc_format[0], sparse_rpc_format[1], sparse_rpc_format[2]
).coalesce()
def process_bucket_with_remote_server(state, bucket):
r"""
Processes a gradient bucket passed by a DDP communication hook
during .backward(). The method supports processing sparse and dense
tensors. It records RPC future completion time metric for the trainer.
Args:
state (object): maintains state during the training process
bucket (GradBucket): gradient bucket
"""
cref = state.cref
tensor = bucket.buffer()
if not cref.use_cuda_rpc:
tensor = tensor.cpu()
sparse = tensor.is_sparse
if sparse:
tensor = sparse_tensor_to_rpc_format(tensor)
b_index = bucket.get_index()
server_args = [cref.server_rref, state.batch_number, b_index, tensor]
key = state.get_key(b_index)
cref.record_start("hook_future_metric", key, RPC_SPARSE if sparse else RPC_DENSE)
fut = cref.server_rref.rpc_async().average_gradient(*server_args)
def callback(fut):
cref.record_end("hook_future_metric", key)
tensor = fut.wait()
if type(tensor) is list:
tensor = sparse_rpc_format_to_tensor(tensor)
tensor = tensor.cuda(cref.rank)
return [tensor]
return fut.then(callback)

View File

@ -1,42 +0,0 @@
# Distributed RPC Reinforcement Learning Benchmark
This tool is used to measure `torch.distributed.rpc` throughput and latency for reinforcement learning.
The benchmark spawns one *agent* process and a configurable number of *observer* processes. As this benchmark focuses on RPC throughput and latency, the agent uses a dummy policy and observers all use randomly generated states and rewards. In each iteration, observers pass their state to the agent through `torch.distributed.rpc` and wait for the agent to respond with an action. If `batch=False`, then the agent will process and respond to a single observer request at a time. Otherwise, the agent will accumulate requests from multiple observers and run them through the policy in one shot. There is also a separate *coordinator* process that manages the *agent* and *observers*.
In addition to printing measurements, this benchmark produces a JSON file. Users may choose a single argument to provide multiple comma-separated entries for (ie: `world_size="10,50,100"`) in which case the JSON file produced can be passed to the plotting repo to visually see how results differ. In this case, each entry for the variable argument will be placed on the x axis.
The benchmark results comprise of 4 key metrics:
1. _Agent Latency_ - How long does it take from the time the first action request in a batch is received from an observer to the time an action is selected by the agent for each request in that batch. If `batch=False` you can think of it as `batch_size=1`.
2. _Agent Throughput_ - The number of request processed per second for a given batch. Agent throughput is literally computed as `(batch_size / agent_latency)`. If not using batch, you can think of it as `batch_size=1`.
3. _Observer Latency_ - Time it takes from the moment an action is requested by a single observer to the time the response is received from the agent. Therefore if `batch=False`, observer latency is the agent latency plus the transit time it takes for the request to get to the agent from the observer plus the transit time it takes for the response to get to the observer from the agent. When `batch=True` there will be more variation due to some observer requests being queued in a batch for longer than others depending on what order those requests came into the batch in.
4. _Observer Throughput_ - Number of requests processed per second for a single observer. Observer Throughput is literally computed as `(1 / observer_latency)`.
## Requirements
This benchmark depends on PyTorch.
## How to run
For any environments you are interested in, pass the corresponding arguments to `python launcher.py`.
```python launcher.py --world-size="10,20" --master-addr="127.0.0.1" --master-port="29501 --batch="True" --state-size="10-20-10" --nlayers="5" --out-features="10" --output-file-path="benchmark_report.json"```
Example Output:
```
--------------------------------------------------------------
PyTorch distributed rpc benchmark reinforcement learning suite
--------------------------------------------------------------
master_addr : 127.0.0.1
master_port : 29501
batch : True
state_size : 10-20-10
nlayers : 5
out_features : 10
output_file_path : benchmark_report.json
x_axis_name : world_size
world_size | agent latency (seconds) agent throughput observer latency (seconds) observer throughput
p50 p75 p90 p95 p50 p75 p90 p95 p50 p75 p90 p95 p50 p75 p90 p95
10 0.002 0.002 0.002 0.002 4432 4706 4948 5128 0.002 0.003 0.003 0.003 407 422 434 443
20 0.004 0.005 0.005 0.005 4244 4620 4884 5014 0.005 0.005 0.006 0.006 191 207 215 220

View File

@ -1,171 +0,0 @@
import operator
import threading
import time
from functools import reduce
import torch
import torch.distributed.rpc as rpc
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
OBSERVER_NAME = "observer{}"
class Policy(nn.Module):
def __init__(self, in_features, nlayers, out_features):
r"""
Inits policy class
Args:
in_features (int): Number of input features the model takes
nlayers (int): Number of layers in the model
out_features (int): Number of features the model outputs
"""
super().__init__()
self.model = nn.Sequential(
nn.Flatten(1, -1),
nn.Linear(in_features, out_features),
*[nn.Linear(out_features, out_features) for _ in range(nlayers)],
)
self.dim = 0
def forward(self, x):
action_scores = self.model(x)
return F.softmax(action_scores, dim=self.dim)
class AgentBase:
def __init__(self):
r"""
Inits agent class
"""
self.id = rpc.get_worker_info().id
self.running_reward = 0
self.eps = 1e-7
self.rewards = {}
self.future_actions = torch.futures.Future()
self.lock = threading.Lock()
self.agent_latency_start = None
self.agent_latency_end = None
self.agent_latency = []
self.agent_throughput = []
def reset_metrics(self):
r"""
Sets all benchmark metrics to their empty values
"""
self.agent_latency_start = None
self.agent_latency_end = None
self.agent_latency = []
self.agent_throughput = []
def set_world(self, batch_size, state_size, nlayers, out_features, batch=True):
r"""
Further initializes agent to be aware of rpc environment
Args:
batch_size (int): size of batches of observer requests to process
state_size (list): List of ints dictating the dimensions of the state
nlayers (int): Number of layers in the model
out_features (int): Number of out features in the model
batch (bool): Whether to process and respond to observer requests as a batch or 1 at a time
"""
self.batch = batch
self.policy = Policy(reduce(operator.mul, state_size), nlayers, out_features)
self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
self.batch_size = batch_size
for rank in range(batch_size):
ob_info = rpc.get_worker_info(OBSERVER_NAME.format(rank + 2))
self.rewards[ob_info.id] = []
self.saved_log_probs = (
[] if self.batch else {k: [] for k in range(self.batch_size)}
)
self.pending_states = self.batch_size
self.state_size = state_size
self.states = torch.zeros(self.batch_size, *state_size)
@staticmethod
@rpc.functions.async_execution
def select_action_batch(agent_rref, observer_id, state):
r"""
Receives state from an observer to select action for. Queues the observers's request
for an action until queue size equals batch size named during Agent initiation, at which point
actions are selected for all pending observer requests and communicated back to observers
Args:
agent_rref (RRef): RRFef of this agent
observer_id (int): Observer id of observer calling this function
state (Tensor): Tensor representing current state held by observer
"""
self = agent_rref.local_value()
observer_id -= 2
self.states[observer_id].copy_(state)
future_action = self.future_actions.then(
lambda future_actions: future_actions.wait()[observer_id].item()
)
with self.lock:
if self.pending_states == self.batch_size:
self.agent_latency_start = time.time()
self.pending_states -= 1
if self.pending_states == 0:
self.pending_states = self.batch_size
probs = self.policy(self.states)
m = Categorical(probs)
actions = m.sample()
self.saved_log_probs.append(m.log_prob(actions).t())
future_actions = self.future_actions
self.future_actions = torch.futures.Future()
future_actions.set_result(actions)
self.agent_latency_end = time.time()
batch_latency = self.agent_latency_end - self.agent_latency_start
self.agent_latency.append(batch_latency)
self.agent_throughput.append(self.batch_size / batch_latency)
return future_action
@staticmethod
def select_action_non_batch(agent_rref, observer_id, state):
r"""
Select actions based on observer state and communicates back to observer
Args:
agent_rref (RRef): RRef of this agent
observer_id (int): Observer id of observer calling this function
state (Tensor): Tensor representing current state held by observer
"""
self = agent_rref.local_value()
observer_id -= 2
agent_latency_start = time.time()
state = state.float().unsqueeze(0)
probs = self.policy(state)
m = Categorical(probs)
action = m.sample()
self.saved_log_probs[observer_id].append(m.log_prob(action))
agent_latency_end = time.time()
non_batch_latency = agent_latency_end - agent_latency_start
self.agent_latency.append(non_batch_latency)
self.agent_throughput.append(1 / non_batch_latency)
return action.item()
def finish_episode(self, rets):
r"""
Finishes the episode
Args:
rets (list): List containing rewards generated by selct action calls during
episode run
"""
return self.agent_latency, self.agent_throughput

View File

@ -1,143 +0,0 @@
import time
import numpy as np
from agent import AgentBase
from observer import ObserverBase
import torch
import torch.distributed.rpc as rpc
COORDINATOR_NAME = "coordinator"
AGENT_NAME = "agent"
OBSERVER_NAME = "observer{}"
EPISODE_STEPS = 100
class CoordinatorBase:
def __init__(self, batch_size, batch, state_size, nlayers, out_features):
r"""
Coordinator object to run on worker. Only one coordinator exists. Responsible
for facilitating communication between agent and observers and recording benchmark
throughput and latency data.
Args:
batch_size (int): Number of observer requests to process in a batch
batch (bool): Whether to process and respond to observer requests as a batch or 1 at a time
state_size (list): List of ints dictating the dimensions of the state
nlayers (int): Number of layers in the model
out_features (int): Number of out features in the model
"""
self.batch_size = batch_size
self.batch = batch
self.agent_rref = None # Agent RRef
self.ob_rrefs = [] # Observer RRef
agent_info = rpc.get_worker_info(AGENT_NAME)
self.agent_rref = rpc.remote(agent_info, AgentBase)
for rank in range(batch_size):
ob_info = rpc.get_worker_info(OBSERVER_NAME.format(rank + 2))
ob_ref = rpc.remote(ob_info, ObserverBase)
self.ob_rrefs.append(ob_ref)
ob_ref.rpc_sync().set_state(state_size, batch)
self.agent_rref.rpc_sync().set_world(
batch_size, state_size, nlayers, out_features, self.batch
)
def run_coordinator(self, episodes, episode_steps, queue):
r"""
Runs n benchmark episodes. Each episode is started by coordinator telling each
observer to contact the agent. Each episode is concluded by coordinator telling agent
to finish the episode, and then the coordinator records benchmark data
Args:
episodes (int): Number of episodes to run
episode_steps (int): Number steps to be run in each episdoe by each observer
queue (SimpleQueue): SimpleQueue from torch.multiprocessing.get_context() for
saving benchmark run results to
"""
agent_latency_final = []
agent_throughput_final = []
observer_latency_final = []
observer_throughput_final = []
for ep in range(episodes):
ep_start_time = time.time()
print(f"Episode {ep} - ", end="")
n_steps = episode_steps
futs = []
for ob_rref in self.ob_rrefs:
futs.append(
ob_rref.rpc_async().run_ob_episode(self.agent_rref, n_steps)
)
rets = torch.futures.wait_all(futs)
agent_latency, agent_throughput = self.agent_rref.rpc_sync().finish_episode(
rets
)
self.agent_rref.rpc_sync().reset_metrics()
agent_latency_final += agent_latency
agent_throughput_final += agent_throughput
observer_latency_final += [ret[2] for ret in rets]
observer_throughput_final += [ret[3] for ret in rets]
ep_end_time = time.time()
episode_time = ep_end_time - ep_start_time
print(round(episode_time, 3))
observer_latency_final = [t for s in observer_latency_final for t in s]
observer_throughput_final = [t for s in observer_throughput_final for t in s]
benchmark_metrics = {
"agent latency (seconds)": {},
"agent throughput": {},
"observer latency (seconds)": {},
"observer throughput": {},
}
print(f"For batch size {self.batch_size}")
print("\nAgent Latency - ", len(agent_latency_final))
agent_latency_final = sorted(agent_latency_final)
for p in [50, 75, 90, 95]:
v = np.percentile(agent_latency_final, p)
print("p" + str(p) + ":", round(v, 3))
p = f"p{p}"
benchmark_metrics["agent latency (seconds)"][p] = round(v, 3)
print("\nAgent Throughput - ", len(agent_throughput_final))
agent_throughput_final = sorted(agent_throughput_final)
for p in [50, 75, 90, 95]:
v = np.percentile(agent_throughput_final, p)
print("p" + str(p) + ":", int(v))
p = f"p{p}"
benchmark_metrics["agent throughput"][p] = int(v)
print("\nObserver Latency - ", len(observer_latency_final))
observer_latency_final = sorted(observer_latency_final)
for p in [50, 75, 90, 95]:
v = np.percentile(observer_latency_final, p)
print("p" + str(p) + ":", round(v, 3))
p = f"p{p}"
benchmark_metrics["observer latency (seconds)"][p] = round(v, 3)
print("\nObserver Throughput - ", len(observer_throughput_final))
observer_throughput_final = sorted(observer_throughput_final)
for p in [50, 75, 90, 95]:
v = np.percentile(observer_throughput_final, p)
print("p" + str(p) + ":", int(v))
p = f"p{p}"
benchmark_metrics["observer throughput"][p] = int(v)
if queue:
queue.put(benchmark_metrics)

View File

@ -1,253 +0,0 @@
import argparse
import json
import os
import time
from coordinator import CoordinatorBase
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
COORDINATOR_NAME = "coordinator"
AGENT_NAME = "agent"
OBSERVER_NAME = "observer{}"
TOTAL_EPISODES = 10
TOTAL_EPISODE_STEPS = 100
def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ("yes", "true", "t", "y", "1"):
return True
elif v.lower() in ("no", "false", "f", "n", "0"):
return False
else:
raise argparse.ArgumentTypeError("Boolean value expected.")
parser = argparse.ArgumentParser(description="PyTorch RPC RL Benchmark")
parser.add_argument("--world-size", "--world_size", type=str, default="10")
parser.add_argument("--master-addr", "--master_addr", type=str, default="127.0.0.1")
parser.add_argument("--master-port", "--master_port", type=str, default="29501")
parser.add_argument("--batch", type=str, default="True")
parser.add_argument("--state-size", "--state_size", type=str, default="10-20-10")
parser.add_argument("--nlayers", type=str, default="5")
parser.add_argument("--out-features", "--out_features", type=str, default="10")
parser.add_argument(
"--output-file-path",
"--output_file_path",
type=str,
default="benchmark_report.json",
)
args = parser.parse_args()
args = vars(args)
def run_worker(
rank,
world_size,
master_addr,
master_port,
batch,
state_size,
nlayers,
out_features,
queue,
):
r"""
inits an rpc worker
Args:
rank (int): Rpc rank of worker machine
world_size (int): Number of workers in rpc network (number of observers +
1 agent + 1 coordinator)
master_addr (str): Master address of cooridator
master_port (str): Master port of coordinator
batch (bool): Whether agent will use batching or process one observer
request a at a time
state_size (str): Numerical str representing state dimensions (ie: 5-15-10)
nlayers (int): Number of layers in model
out_features (int): Number of out features in model
queue (SimpleQueue): SimpleQueue from torch.multiprocessing.get_context() for
saving benchmark run results to
"""
state_size = list(map(int, state_size.split("-")))
batch_size = world_size - 2 # No. of observers
os.environ["MASTER_ADDR"] = master_addr
os.environ["MASTER_PORT"] = master_port
if rank == 0:
rpc.init_rpc(COORDINATOR_NAME, rank=rank, world_size=world_size)
coordinator = CoordinatorBase(
batch_size, batch, state_size, nlayers, out_features
)
coordinator.run_coordinator(TOTAL_EPISODES, TOTAL_EPISODE_STEPS, queue)
elif rank == 1:
rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)
else:
rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
rpc.shutdown()
def find_graph_variable(args):
r"""
Determines if user specified multiple entries for a single argument, in which case
benchmark is run for each of these entries. Comma separated values in a given argument indicate multiple entries.
Output is presented so that user can use plot repo to plot the results with each of the
variable argument's entries on the x-axis. Args is modified in accordance with this.
More than 1 argument with multiple entries is not permitted.
Args:
args (dict): Dictionary containing arguments passed by the user (and default arguments)
"""
var_types = {
"world_size": int,
"state_size": str,
"nlayers": int,
"out_features": int,
"batch": str2bool,
}
for arg in var_types.keys():
if "," in args[arg]:
if args.get("x_axis_name"):
raise ValueError("Only 1 x axis graph variable allowed")
args[arg] = list(
map(var_types[arg], args[arg].split(","))
) # convert , separated str to list
args["x_axis_name"] = arg
else:
args[arg] = var_types[arg](args[arg]) # convert string to proper type
def append_spaces(string, length):
r"""
Returns a modified string with spaces appended to the end. If length of string argument
is greater than or equal to length, a single space is appended, otherwise x spaces are appended
where x is the difference between the length of string and the length argument
Args:
string (str): String to be modified
length (int): Size of desired return string with spaces appended
Return: (str)
"""
string = str(string)
offset = length - len(string)
if offset <= 0:
offset = 1
string += " " * offset
return string
def print_benchmark_results(report):
r"""
Prints benchmark results
Args:
report (dict): JSON formatted dictionary containing relevant data on the run of this application
"""
print("--------------------------------------------------------------")
print("PyTorch distributed rpc benchmark reinforcement learning suite")
print("--------------------------------------------------------------")
for key, val in report.items():
if key != "benchmark_results":
print(f"{key} : {val}")
x_axis_name = report.get("x_axis_name")
col_width = 7
heading = ""
if x_axis_name:
x_axis_output_label = f"{x_axis_name} |"
heading += append_spaces(x_axis_output_label, col_width)
metric_headers = [
"agent latency (seconds)",
"agent throughput",
"observer latency (seconds)",
"observer throughput",
]
percentile_subheaders = ["p50", "p75", "p90", "p95"]
subheading = ""
if x_axis_name:
subheading += append_spaces(" " * (len(x_axis_output_label) - 1), col_width)
for header in metric_headers:
heading += append_spaces(header, col_width * len(percentile_subheaders))
for percentile in percentile_subheaders:
subheading += append_spaces(percentile, col_width)
print(heading)
print(subheading)
for benchmark_run in report["benchmark_results"]:
run_results = ""
if x_axis_name:
run_results += append_spaces(
benchmark_run[x_axis_name], max(col_width, len(x_axis_output_label))
)
for metric_name in metric_headers:
percentile_results = benchmark_run[metric_name]
for percentile in percentile_subheaders:
run_results += append_spaces(percentile_results[percentile], col_width)
print(run_results)
def main():
r"""
Runs rpc benchmark once if no argument has multiple entries, and otherwise once for each of the multiple entries.
Multiple entries is indicated by comma separated values, and may only be done for a single argument.
Results are printed as well as saved to output file. In case of multiple entries for a single argument,
the plot repo can be used to benchmark results on the y axis with each entry on the x axis.
"""
find_graph_variable(args)
# run once if no x axis variables
x_axis_variables = args[args["x_axis_name"]] if args.get("x_axis_name") else [None]
ctx = mp.get_context("spawn")
queue = ctx.SimpleQueue()
benchmark_runs = []
for i, x_axis_variable in enumerate(
x_axis_variables
): # run benchmark for every x axis variable
if len(x_axis_variables) > 1:
# set x axis variable for this benchmark iteration
args[args["x_axis_name"]] = x_axis_variable
processes = []
start_time = time.time()
for rank in range(args["world_size"]):
prc = ctx.Process(
target=run_worker,
args=(
rank,
args["world_size"],
args["master_addr"],
args["master_port"],
args["batch"],
args["state_size"],
args["nlayers"],
args["out_features"],
queue,
),
)
prc.start()
processes.append(prc)
benchmark_run_results = queue.get()
for process in processes:
process.join()
print(f"Time taken benchmark run {i} -, {time.time() - start_time}")
if args.get("x_axis_name"):
# save x axis value was for this iteration in the results
benchmark_run_results[args["x_axis_name"]] = x_axis_variable
benchmark_runs.append(benchmark_run_results)
report = args
report["benchmark_results"] = benchmark_runs
if args.get("x_axis_name"):
# x_axis_name was variable so dont save a constant in the report for that variable
del report[args["x_axis_name"]]
with open(args["output_file_path"], "w") as f:
json.dump(report, f)
print_benchmark_results(report)
if __name__ == "__main__":
main()

View File

@ -1,78 +0,0 @@
import random
import time
from agent import AgentBase
import torch
import torch.distributed.rpc as rpc
from torch.distributed.rpc import rpc_sync
class ObserverBase:
def __init__(self):
r"""
Inits observer class
"""
self.id = rpc.get_worker_info().id
def set_state(self, state_size, batch):
r"""
Further initializes observer to be aware of rpc environment
Args:
state_size (list): List of integers denoting dimensions of state
batch (bool): Whether agent will be using batch select action
"""
self.state_size = state_size
self.select_action = (
AgentBase.select_action_batch
if batch
else AgentBase.select_action_non_batch
)
def reset(self):
r"""
Resets state randomly
"""
state = torch.rand(self.state_size)
return state
def step(self, action):
r"""
Generates random state and reward
Args:
action (int): Int received from agent representing action to take on state
"""
state = torch.rand(self.state_size)
reward = random.randint(0, 1)
return state, reward
def run_ob_episode(self, agent_rref, n_steps):
r"""
Runs single observer episode where for n_steps, an action is selected
from the agent based on curent state and state is updated
Args:
agent_rref (RRef): Remote Reference to the agent
n_steps (int): Number of times to select an action to transform state per episode
"""
state, ep_reward = self.reset(), None
rewards = torch.zeros(n_steps)
observer_latencies = []
observer_throughput = []
for st in range(n_steps):
ob_latency_start = time.time()
action = rpc_sync(
agent_rref.owner(),
self.select_action,
args=(agent_rref, self.id, state),
)
ob_latency = time.time() - ob_latency_start
observer_latencies.append(ob_latency)
observer_throughput.append(1 / ob_latency)
state, reward = self.step(action)
rewards[st] = reward
return [rewards, ep_reward, observer_latencies, observer_throughput]