ps sparse rpc (#58003)

Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/58003

adds trainer class DdpTrainer
adds trainer class DdpSparseRpcTrainer
adds server class ParameterServerBase
adds server class AverageParameterServer
adds experiment ddp_cpu_sparse_rpc_nccl_allreduce
adds experiment ddp_cuda_sparse_rpc_nccl_allreduce

quip document https://fb.quip.com/iQUtAeKIxWpF

Test Plan: Imported from OSS

Reviewed By: albanD

Differential Revision: D29379696

Pulled By: gcramer23

fbshipit-source-id: 9cf5fb7398ba2fa3eb694afbddc4ed00d97f205f
This commit is contained in:
Garrett Cramer
2021-06-24 17:20:33 -07:00
committed by Facebook GitHub Bot
parent fadaa52f64
commit 4ed2d5d9bb
17 changed files with 1051 additions and 202 deletions

View File

@ -1,15 +0,0 @@
from dataclasses import dataclass
@dataclass
class BenchmarkConfigurations:
trainer_count: int = 1
ps_count: int = 0
batch_size: int = 1
print_metrics_to_dir: bool = False
master_addr: str = "localhost"
master_port: str = "29500"
rpc_async_timeout: int = 5
rpc_init_method: str = "tcp://localhost:29501"
trainer_config: dict = None
ps_config: dict = None

View File

@ -1,12 +1,19 @@
from data.DummyData import DummyData
from models.DummyModel import DummyModel
from servers.AverageParameterServer import AverageParameterServer
from trainers.DdpNcclTrainer import DdpNcclTrainer
from trainers.DdpSparseRpcTrainer import DdpSparseRpcTrainer
from trainers.DdpTrainer import DdpTrainer
trainer_map = {
"DdpNcclTrainer": DdpNcclTrainer
"DdpNcclTrainer": DdpNcclTrainer,
"DdpTrainer": DdpTrainer,
"DdpSparseRpcTrainer": DdpSparseRpcTrainer
}
ps_map = {}
server_map = {
"AverageParameterServer": AverageParameterServer
}
model_map = {
"DummyModel": DummyModel
@ -21,8 +28,8 @@ def get_benchmark_trainer_map():
return trainer_map
def get_benchmark_ps_map():
return ps_map
def get_benchmark_server_map():
return server_map
def get_benchmark_model_map():

View File

@ -1,8 +0,0 @@
{
"3": {
"trainer_count": 2,
"ps_count": 0,
"rpc_async_timeout": 15,
"batch_size": 5
}
}

View File

@ -1,8 +0,0 @@
{
"DdpNcclTrainer": {
"trainer_class": "DdpNcclTrainer",
"configurations": {
"epochs": 10
}
}
}

View File

@ -0,0 +1,23 @@
#!/bin/sh
cd "$(dirname "$0")"
cd ..
python -u launcher.py \
--master_addr="localhost" \
--master_port="29500" \
--trainer="DdpSparseRpcTrainer" \
--ntrainer=2 \
--ncudatrainer=0 \
--filestore="/tmp/tmpn_k_8so02" \
--server="AverageParameterServer" \
--nserver=1 \
--ncudaserver=0 \
--rpc_timeout=30 \
--backend="nccl" \
--epochs=10 \
--batch_size=10 \
--data="DummyData" \
--model="DummyModelSparse" \
--data_config_path="configurations/data_configurations.json" \
--model_config_path="configurations/model_configurations.json"

View File

@ -0,0 +1,23 @@
#!/bin/sh
cd "$(dirname "$0")"
cd ..
python -u launcher.py \
--master_addr="localhost" \
--master_port="29500" \
--trainer="DdpSparseRpcTrainer" \
--ntrainer=0 \
--ncudatrainer=2 \
--filestore="/tmp/tmpn_k_8so02" \
--server="AverageParameterServer" \
--nserver=0 \
--ncudaserver=1 \
--rpc_timeout=30 \
--backend="nccl" \
--epochs=10 \
--batch_size=10 \
--data="DummyData" \
--model="DummyModelSparse" \
--data_config_path="configurations/data_configurations.json" \
--model_config_path="configurations/model_configurations.json"

View File

@ -4,8 +4,19 @@ cd "$(dirname "$0")"
cd ..
python -u launcher.py \
--benchmark="3" \
--data="DummyData" \
--model="DummyModel" \
--server="None" \
--trainer="DdpNcclTrainer"
--master_addr="localhost" \
--master_port="29500" \
--trainer="DdpTrainer" \
--ntrainer=2 \
--ncudatrainer=0 \
--filestore="/tmp/tmpn_k_8so02" \
--nserver=0 \
--ncudaserver=0 \
--rpc_timeout=30 \
--backend="nccl" \
--epochs=10 \
--batch_size=10 \
--data="DummyData" \
--model="DummyModel" \
--data_config_path="configurations/data_configurations.json" \
--model_config_path="configurations/model_configurations.json"

View File

@ -4,72 +4,100 @@ import json
import os
from pathlib import Path
import torch
import torch.distributed as c10d
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
from torch.distributed.rpc import TensorPipeRpcBackendOptions
from torch.futures import wait_all
from torch.utils.data import DataLoader
from benchmark_class_helper import (get_benchmark_data_map,
get_benchmark_model_map,
get_benchmark_ps_map,
get_benchmark_server_map,
get_benchmark_trainer_map)
from BenchmarkConfigurations import BenchmarkConfigurations
from metrics.ProcessedMetricsPrinter import ProcessedMetricsPrinter
USE_CUDA_RPC = "use_cuda_rpc"
def get_name(rank, configs):
t_count = configs.trainer_count
ps_count = configs.ps_count
def get_name(rank, args):
t_count = args.ntrainer + args.ncudatrainer
s_count = args.nserver + args.ncudaserver
if rank < t_count:
return f"trainer{rank}"
elif rank < (t_count + ps_count):
return f"ps{rank}"
elif rank < (t_count + s_count):
return f"server{rank}"
else:
return "master"
def get_parameter_server_rank(rank, config):
# rank mod parameter server count to get parameter server number
# add trainer_count to get parameter server rank
rank_mod_ps_count = rank % config.ps_count
return rank_mod_ps_count + config.trainer_count
def get_server_rank(args, rank):
s_offset = args.ntrainer + args.ncudatrainer
tps = args.ntrainer // args.nserver
return rank // tps + s_offset
def get_ps_rref(parameter_server_rank, config):
ps_config = config.ps_config
ps = get_benchmark_ps_map()[str(ps_config["ps_class"])]
def get_cuda_server_rank(args, rank):
s_offset = args.ntrainer + args.ncudatrainer + args.nserver
t_index = rank - args.ntrainer
ctps = args.ncudatrainer // args.ncudaserver
return t_index // ctps + s_offset
def get_server_rref(server_rank, args, extra_args):
server = get_benchmark_server_map()[str(args.server)]
name = get_name(
parameter_server_rank,
config
server_rank,
args
)
ps_args = ps_config["configurations"].values()
ps_trainer_count = config.trainer_count / ps_config.ps_count
rem = config.trainer_count % ps_config.ps_count
if parameter_server_rank - config.trainer_count < rem:
ps_trainer_count += 1
if extra_args is not None:
server_args = extra_args.values()
else:
server_args = []
if server_rank >= args.ntrainer + args.ncudatrainer + args.nserver:
trainer_count = args.ncudatrainer / args.ncudaserver
use_cuda_rpc = True
else:
trainer_count = args.ntrainer / args.nserver
use_cuda_rpc = False
return rpc.remote(
name,
ps,
server,
args=(
parameter_server_rank,
ps_trainer_count,
*ps_args,
server_rank,
trainer_count,
use_cuda_rpc,
*server_args,
),
)
def run_trainer(
config, model, data, rank, ps_rref
args, extra_args, model, data, rank, server_rref
):
trainer_config = config.trainer_config
trainer_class = get_benchmark_trainer_map()[str(trainer_config["trainer_class"])]
trainer_args = trainer_config["configurations"].values()
trainer_class = get_benchmark_trainer_map()[str(args.trainer)]
if extra_args is not None:
trainer_args = extra_args.values()
else:
trainer_args = []
trainer_count = args.ntrainer + args.ncudatrainer
store = c10d.FileStore(args.filestore, trainer_count)
if args.backend == "gloo":
process_group = c10d.ProcessGroupGloo(
store, rank, trainer_count
)
elif args.backend == "nccl":
process_group = c10d.ProcessGroupNCCL(
store, rank, trainer_count
)
use_cuda_rpc = rank >= args.ntrainer
trainer = trainer_class(
rank,
config.trainer_count,
ps_rref,
args.ntrainer + args.ncudatrainer,
process_group,
use_cuda_rpc,
server_rref,
args.backend,
args.epochs,
*trainer_args
)
trainer.train(model, data)
@ -77,48 +105,44 @@ def run_trainer(
return [rank, metrics]
def call_trainers(config, model, train_data, parameter_server_rrefs):
def call_trainers(args, extra_args, model, train_data, server_rrefs):
futs = []
for trainer_rank in range(0, config.trainer_count):
for trainer_rank in range(0, args.ntrainer + args.ncudatrainer):
trainer_name = get_name(
trainer_rank,
config
args
)
ps_rref = None
if parameter_server_rrefs:
ps_rank = get_parameter_server_rank(trainer_rank, config)
ps_rref = parameter_server_rrefs[ps_rank]
server_rref = None
if server_rrefs:
if trainer_rank >= args.ntrainer:
server_rank = get_cuda_server_rank(args, trainer_rank)
else:
server_rank = get_server_rank(args, trainer_rank)
server_rref = server_rrefs[server_rank]
fut = rpc.rpc_async(
trainer_name,
run_trainer,
args=(
config,
args,
extra_args,
copy.deepcopy(model),
train_data[trainer_rank],
trainer_rank,
ps_rref,
server_rref,
),
timeout=config.rpc_async_timeout
timeout=args.rpc_timeout
)
futs.append(fut)
return futs
def benchmark_warmup(
config, model, data, parameter_server_rrefs
args, extra_args, model, data, server_rrefs
):
if config.ps_count > 0:
ps_config = config.ps_config
ps = get_benchmark_ps_map()[str(ps_config["ps_class"])]
futs = call_trainers(config, model, data, parameter_server_rrefs)
for fut in futs:
fut.wait()
for ps_rref in parameter_server_rrefs.values():
rpc.rpc_sync(
ps_rref.owner(),
ps.reset_state,
args=(ps_rref,)
)
futs = call_trainers(args, extra_args, model, data, server_rrefs)
wait_all(futs)
for server_rref in server_rrefs.values():
server_rref.rpc_sync().reset_state(server_rref)
print("benchmark warmup done\n")
@ -126,84 +150,88 @@ def split_list(arr, n):
return [arr[i::n] for i in range(n)]
def run_master(rank, model, data, config, rpc_backend_options):
world_size = config.trainer_count + config.ps_count + 1
def get_server_metrics(server_rrefs):
rank_metrics = []
for rank, server_rref in server_rrefs.items():
metrics = server_rref.rpc_sync().get_metrics(server_rref)
rank_metrics.append([rank, metrics])
return rank_metrics
def run_master(rank, model, data, args, extra_configs, rpc_backend_options):
world_size = args.ntrainer + args.ncudatrainer + args.nserver + args.ncudaserver + 1
rpc.init_rpc(
get_name(
rank,
config
args
),
rank=rank,
world_size=world_size,
rpc_backend_options=rpc_backend_options
)
parameter_server_rrefs = {}
server_rrefs = {}
for i in range(
config.trainer_count, world_size - 1
args.ntrainer + args.ncudatrainer, world_size - 1
):
parameter_server_rrefs[i] = get_ps_rref(i, config)
server_rrefs[i] = get_server_rref(i, args, extra_configs["server_config"])
train_data = split_list(
list(DataLoader(data, batch_size=config.batch_size)),
config.trainer_count
list(DataLoader(data, batch_size=args.batch_size)),
args.ntrainer + args.ncudatrainer
)
# warmup run the benchmark
benchmark_warmup(
config, model, train_data, parameter_server_rrefs
args, extra_configs["trainer_config"], model, train_data, server_rrefs
)
# run the benchmark
trainer_futs = call_trainers(
config, model, train_data, parameter_server_rrefs
args, extra_configs["trainer_config"], model, train_data, server_rrefs
)
# collect metrics and print
metrics_printer = ProcessedMetricsPrinter()
rank_metrics_list = [fut.wait() for fut in trainer_futs]
rank_metrics_list = wait_all(trainer_futs)
metrics_printer.print_metrics("trainer", rank_metrics_list)
rank_metrics_list = get_server_metrics(server_rrefs)
metrics_printer.print_metrics("parameter server", rank_metrics_list)
def run_benchmark(rank, model, data, config):
def run_benchmark(rank, model, data, args, config):
world_size = config.trainer_count + config.ps_count + 1
os.environ['MASTER_ADDR'] = config.master_addr
os.environ['MASTER_PORT'] = config.master_port
rpc_backend_options = TensorPipeRpcBackendOptions()
rpc_backend_options.init_method = config.rpc_init_method
torch.manual_seed(args.torch_seed)
torch.cuda.manual_seed_all(args.cuda_seed)
torch.backends.cudnn.benchmark = True
torch.backends.cudnn.deterministic = True
world_size = args.ntrainer + args.ncudatrainer + args.nserver + args.ncudaserver + 1
os.environ['MASTER_ADDR'] = args.master_addr
os.environ['MASTER_PORT'] = args.master_port
rpc_backend_options = TensorPipeRpcBackendOptions(rpc_timeout=args.rpc_timeout)
if rank == world_size - 1:
# master = [trainer_count + parameter_server_count, trainer_count + parameter_server_count]
run_master(rank, model, data, config, rpc_backend_options)
elif rank >= config.trainer_count:
# parameter_servers = [trainer_count, trainer_count + parameter_server_count)
# master = [ntrainer + ncudatrainer + nserver + ncudaserver, ntrainer + ncudatrainer + nserver + ncudaserver]
run_master(rank, model, data, args, config, rpc_backend_options)
elif rank >= args.ntrainer + args.ncudatrainer:
# parameter_servers = [ntrainer + ncudatrainer, ntrainer + ncudatrainer + nserver + ncudaserver)
rpc.init_rpc(
get_name(
rank,
config
args
),
rank=rank,
world_size=world_size,
rpc_backend_options=rpc_backend_options
)
else:
# trainers = [0, trainer_count)
trainer_config = config.trainer_config
ps_config = config.ps_config
if (USE_CUDA_RPC in trainer_config and
trainer_config[USE_CUDA_RPC] and
USE_CUDA_RPC in ps_config and
ps_config[USE_CUDA_RPC] and
config.ps_count > 0):
ps_rank = get_parameter_server_rank(rank, config)
ps_name = get_name(
ps_rank,
config
)
# trainers = [0, ntrainer + ncudatrainer)
if rank >= args.ntrainer:
server_rank = get_cuda_server_rank(args, rank)
server_name = get_name(server_rank, args)
rpc_backend_options.set_device_map(
ps_name,
{rank: ps_rank}
server_name,
{rank: server_rank}
)
trainer_name = get_name(
rank,
config
args
)
rpc.init_rpc(
trainer_name,
@ -221,16 +249,18 @@ def get_json_config(file_name, id):
return json_config
def load_configurations(args):
def load_extra_configs(args):
trainer_config_file = args.trainer_config_path
ps_config_file = args.server_config_path
benchmark_config = get_json_config(args.benchmark_config_path, args.benchmark)
benchmark_config["trainer_config"] = get_json_config(trainer_config_file, args.trainer)
if args.server != "None":
benchmark_config["ps_config"] = get_json_config(ps_config_file, args.server)
else:
benchmark_config["ps_config"] = None
return BenchmarkConfigurations(**benchmark_config)
server_config_file = args.server_config_path
configurations = {
"trainer_config": None,
"server_config": None
}
if args.trainer is not None and trainer_config_file is not None:
configurations["trainer_config"] = get_json_config(trainer_config_file, args.trainer)
if args.server is not None and server_config_file is not None:
configurations["server_config"] = get_json_config(server_config_file, args.server)
return configurations
def get_data(data_class, data_config):
@ -255,43 +285,106 @@ def load_model(args):
return get_model(model_config["model_class"], model_config["configurations"])
def main():
parser = argparse.ArgumentParser(description="RPC PS Benchmark")
def main(args):
# CPU and RPC trainer checks
if args.ntrainer > 0 and args.ncudatrainer > 0:
assert args.nserver > 0 and args.ncudaserver > 0
if args.nserver > 0:
assert args.ntrainer > 0
assert args.ntrainer % args.nserver == 0
if args.ncudaserver > 0:
assert args.ncudatrainer > 0
assert args.ncudatrainer % args.ncudaserver == 0
extra_configs = load_extra_configs(args)
data = load_data(args)
model = load_model(args)
world_size = (
args.ntrainer + args.ncudatrainer + args.nserver + args.ncudaserver + 1
)
mp.spawn(
run_benchmark,
args=(
model,
data,
args,
extra_configs,
),
nprocs=world_size,
join=True
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="RPC server Benchmark")
parser.add_argument(
"--benchmark_config_path",
"--master_addr",
type=str,
default="configurations/benchmark_configurations.json",
help="path to benchmark configuration file"
help="IP address of the machine that will host the process with rank 0"
)
parser.add_argument(
"--data_config_path",
"--master_port",
type=str,
default="configurations/data_configurations.json",
help="path to data configuration file"
help="A free port on the machine that will host the process with rank 0"
)
parser.add_argument(
"--model_config_path",
"--trainer",
type=str,
default="configurations/model_configurations.json",
help="path to model configuration file"
help="trainer map key to get trainer class for benchmark run"
)
parser.add_argument(
"--server_config_path",
type=str,
default="configurations/server_configurations.json",
help="path to server configuration file"
"--ntrainer",
type=int,
help="trainer count for benchmark run"
)
parser.add_argument(
"--trainer_config_path",
type=str,
default="configurations/trainer_configurations.json",
help="path to trainer configuration file"
"--ncudatrainer",
type=int,
help="cudatrainer count for benchmark run"
)
parser.add_argument(
"--benchmark",
"--filestore",
type=str,
help="id for benchmark configuration"
help="filestore location for process group"
)
parser.add_argument(
"--server",
type=str,
help="server map key to get trainer class for benchmark run"
)
parser.add_argument(
"--nserver",
type=int,
help="server count for benchmark run"
)
parser.add_argument(
"--ncudaserver",
type=int,
help="cudaserver count for benchmark run"
)
parser.add_argument(
"--rpc_timeout",
type=int,
help="timeout in seconds to use for RPC"
)
parser.add_argument(
"--backend",
type=str,
help="distributed communication backend to use for benchmark run"
)
parser.add_argument(
"--epochs",
type=int,
help="epoch count for training"
)
parser.add_argument(
"--batch_size",
type=int,
default=1,
help="number of training examples used in one iteration"
)
parser.add_argument(
"--data",
@ -304,35 +397,37 @@ def main():
help="id for model configuration"
)
parser.add_argument(
"--server",
"--data_config_path",
type=str,
help="id for parameter server configuration"
help="path to data configuration file"
)
parser.add_argument(
"--trainer",
"--model_config_path",
type=str,
help="id for trainer configuration"
help="path to model configuration file"
)
parser.add_argument(
"--server_config_path",
type=str,
help="path to server configuration file"
)
parser.add_argument(
"--trainer_config_path",
type=str,
help="path to trainer configuration file"
)
parser.add_argument(
"--torch_seed",
type=int,
default=0,
help="seed for generating random numbers to a non-deterministic random number"
)
parser.add_argument(
"--cuda_seed",
type=int,
default=0,
help="seed for generating random numbers to a random number for the current GPU"
)
args = parser.parse_args()
print(f"{args}\n")
config = load_configurations(args)
data = load_data(args)
model = load_model(args)
world_size = config.trainer_count + config.ps_count + 1
mp.spawn(
run_benchmark,
args=(
model,
data,
config,
),
nprocs=world_size,
join=True
)
if __name__ == "__main__":
main()
main(args)

View File

@ -0,0 +1,143 @@
import threading
import torch
import torch.distributed.rpc as rpc
from utils import sparse_rpc_format_to_tensor, sparse_tensor_to_rpc_format
from .ParameterServerBase import ParameterServerBase
class AverageParameterServer(ParameterServerBase):
lock = threading.Lock()
def __init__(
self,
rank,
trainer_count,
use_cuda_rpc
):
r"""
A parameter server that averages the gradients
from trainers for each training iteration step.
Gradients are added as they are received from trainers.
When all gradients have been received, the sum is
divided by the number of trainers.
Args:
rank (int): worker rank
trainer_count (int): count of trainers sending
gradients to the server
use_cuda_rpc (bool): indicator for CUDA RPC
"""
super().__init__(rank)
self.rank = rank
self.trainer_count = trainer_count
self.use_cuda_rpc = use_cuda_rpc
self.batch_number = 0
self.futures = {}
self.gradient_dict = {}
@staticmethod
def reset_state(server_rref):
r"""
A method that clears the state of the server.
Args:
server_rref (RRef): remote reference to the server
"""
self = server_rref.local_value()
self.batch_number = 0
self.futures.clear()
self.gradient_dict.clear()
self.clear_metrics()
def param_key(self, param_loc):
r"""
A method that returns an encoded key that represents
the current batch and param location.
Args:
param_loc (int): bucket location sent by the trainer
containing the gradient
"""
return f"{self.batch_number},{param_loc}"
def clear_batch_state(self):
r"""
Clears the current server batch state.
"""
self.futures.clear()
self.gradient_dict.clear()
def process_gradient(self, gradient, param_loc):
r"""
Stores the gradient if param_loc is not in gradient_dict.
Adds the gradient to param_loc if it is in gradient_dict.
Args:
gradient (torch.Tensor): tensor sent from trainer
param_loc (int): bucket location sent by the trainer
containing the gradient
"""
if param_loc not in self.gradient_dict:
self.record_straggler_start(self.param_key(param_loc))
self.record_batch_start(self.param_key(param_loc))
self.gradient_dict[param_loc] = gradient
else:
self.gradient_dict[param_loc] += gradient
@ParameterServerBase.record_method(name="average computation")
def average(self, param_loc):
r"""
Obtains the tensor at the param_loc in the gradient_dict
and then divides by number of trainers.
Args:
param_loc (int): bucket location sent by the trainer
containing the gradient
"""
param_loc_avg = self.gradient_dict[param_loc]
param_loc_avg / (1.0 * self.trainer_count)
return param_loc_avg
@staticmethod
@rpc.functions.async_execution
def average_gradient(
server_rref,
received_batch_number,
param_loc,
gradient
):
r"""
An asynchronous function that will average gradients
sent from trainers.
Args:
server_rref (RRef): remote reference to the server
received_batch_number (int): batch number sent by
the trainer
param_loc (int): bucket location sent by the trainer
containing the gradient
gradient (torch.Tensor or list): tensor sent by the trainer
"""
self = server_rref.local_value()
if type(gradient) is list:
gradient = sparse_rpc_format_to_tensor(gradient)
gradient = gradient.cuda(self.rank)
fut = torch.futures.Future()
with self.lock:
if self.batch_number < received_batch_number:
self.batch_number = received_batch_number
self.clear_batch_state()
self.process_gradient(gradient, param_loc)
if param_loc not in self.futures:
self.futures[param_loc] = []
self.futures[param_loc].append(fut)
if len(self.futures[param_loc]) == self.trainer_count:
self.record_straggler_end(self.param_key(param_loc))
param_loc_avg = self.average(param_loc)
if not self.use_cuda_rpc:
param_loc_avg = param_loc_avg.cpu()
if param_loc_avg.is_sparse:
param_loc_avg = sparse_tensor_to_rpc_format(param_loc_avg)
for cur_fut in self.futures[param_loc]:
cur_fut.set_result(param_loc_avg)
self.record_batch_end(self.param_key(param_loc))
return fut

View File

@ -0,0 +1,170 @@
import functools
import time
from abc import ABC, abstractmethod
from metrics.MetricsLogger import MetricsLogger
class ParameterServerBase(ABC):
PARAMETER_SERVER_BATCH_METRIC = "parameter_server_batch_metric"
PARAMETER_SERVER_STRAGGLER_METRIC = "parameter_server_straggler_metric"
PARAM_INDEX_STRAGGLER = "param_index_straggler"
PARAM_INDEX_BATCH = "param_index_batch"
def __init__(self, rank):
r"""
Inits ParameterServerBase class.
Args:
rank (int): worker rank
"""
self.__metrics_logger = MetricsLogger(rank)
@abstractmethod
def process_gradient(self):
r"""
A method to be implemented by child class that will process a
gradient received by a server.
"""
return
@staticmethod
@abstractmethod
def average_gradient():
r"""
A method to be implemented by child class that will average
gradients.
"""
return
@staticmethod
@abstractmethod
def reset_state():
r"""
A method to be implemented by child class that will reset
the server state.
"""
return
def record_start(self, type, key, name, cuda=True):
r"""
A method that records the start event for a metric.
Args:
type (str): group id for metric
key (str): unique id for metric within a group
name (str): description of the metric
cuda (bool): indicator to determine if this is a CUDA metric
"""
self.__metrics_logger.record_start(
type,
key,
name,
cuda
)
def record_end(self, type, key):
r"""
A method that records the end event for a metric
Args:
type (str): group id for metric
key (str): unique id for metric within a group
"""
self.__metrics_logger.record_end(
type,
key
)
def record_straggler_start(self, key, cuda=True):
r"""
A helper method that records a straggler metric
for the given key. A user should call this when
the first gradient for the param location is received.
Args:
key (str): unique id for metric within a group
cuda (bool): indicator to determine if this is a CUDA metric
"""
self.__metrics_logger.record_start(
self.PARAMETER_SERVER_STRAGGLER_METRIC,
key,
self.PARAM_INDEX_STRAGGLER,
cuda
)
def record_straggler_end(self, key):
r"""
A helper method that records a straggler metric
for the given key. A user should call this when
the last gradient for the param location is received.
Args:
key (str): unique id for metric within a group
"""
self.__metrics_logger.record_end(
self.PARAMETER_SERVER_STRAGGLER_METRIC,
key
)
def record_batch_start(self, key, cuda=True):
r"""
A helper method that records a batch metric
for the given key. A user should call this when
the first gradient for the param location is received.
Args:
key (str): unique id for metric within a group
cuda (bool): indicator to determine if this is a CUDA metric
"""
self.__metrics_logger.record_start(
self.PARAMETER_SERVER_BATCH_METRIC,
key,
self.PARAM_INDEX_BATCH,
cuda
)
def record_batch_end(self, key):
r"""
A helper method that records a batch metric
for the given key. A user should call this when
all futures for a param location have had their
result set.
Args:
key (str): unique id for metric within a group
"""
self.__metrics_logger.record_end(
self.PARAMETER_SERVER_BATCH_METRIC,
key
)
@staticmethod
def record_method(name, type="method_metric", cuda=True):
r"""
A decorator that records a metric for the decorated method.
Args:
name (str): description of the metric
type (str): group id for metric
cuda (bool): indicator to determine if this is a CUDA metric
"""
def decorator(function):
@functools.wraps(function)
def wrapper(self, *args):
key = time.time()
self.__metrics_logger.record_start(type, key, name, cuda)
result = function(self, *args)
self.__metrics_logger.record_end(type, key)
return result
return wrapper
return decorator
@staticmethod
def get_metrics(server_rref):
r"""
A staticmethod that returns metrics captured by the __metrics_logger.
Args:
server_rref (RRef): remote reference to the server
"""
self = server_rref.local_value()
return self.__metrics_logger.get_processed_metrics()
def clear_metrics(self):
r"""
A method that clears __metrics_logger recorded metrics.
"""
return self.__metrics_logger.clear_metrics()

View File

@ -21,3 +21,11 @@ def run_script(script_name):
def test_ddp_nccl_allreduce():
run_script("ddp_nccl_allreduce.sh")
def test_ddp_cpu_sparse_rpc_nccl_allreduce():
run_script("ddp_cpu_sparse_rpc_nccl_allreduce.sh")
def test_ddp_cuda_sparse_rpc_nccl_allreduce():
run_script("ddp_cuda_sparse_rpc_nccl_allreduce.sh")

View File

@ -0,0 +1,54 @@
from utils import process_bucket_with_remote_server
from .DdpTrainer import DdpTrainer
class DdpSparseRpcTrainer(DdpTrainer):
def __init__(self, rank, trainer_count, process_group, use_cuda_rpc, server_rref, backend, epochs):
r"""
A trainer that implements a DDP training algorithm using a server and process group
allreduce. The trainer sends sparse gradients using RPC, and the server averages and
returns the gradients. The process group uses the backend allreduce implementation
to average the dense gradients.
Args:
rank (int): worker rank
trainer_count (int): count of trainer in the world
process_group (ProcessGroup): distributed process group
use_cuda_rpc (bool): indicator for CUDA RPC
server_rref (RRef): remote reference to the server
backend (str): distributed communication backend
epochs (int): epoch count for training
"""
super().__init__(rank, trainer_count, process_group, use_cuda_rpc, server_rref, backend, epochs)
@staticmethod
def hook(state, bucket):
r"""
A ddp communication hook that uses the current backend allreduce
implementation for dense tensors and a server for sparse tensors.
Args:
state (object): maintains state during the training process
bucket (GradBucket): gradient bucket
"""
tensor = bucket.get_tensor()
if tensor.is_sparse:
return process_bucket_with_remote_server(state, bucket)
else:
cref = state.cref
tensor = [tensor / state.process_group.size()]
key = state.get_key(bucket.get_index())
cref.record_hook_fut_start(key, f"{cref.backend}_dense_allreduce")
fut = state.process_group.allreduce(tensor).get_future()
def callback(fut):
cref.record_hook_fut_end(key)
return fut.wait()
return fut.then(callback)
def get_hook(self):
r"""
returns DdpSparseRpcTrainer.hook
"""
return DdpSparseRpcTrainer.hook

View File

@ -0,0 +1,183 @@
import torch
import torch.nn as nn
from torch.nn.parallel import DistributedDataParallel as DDP
from .DdpTrainerBase import DdpTrainerBase
class DdpTrainer(DdpTrainerBase):
class HookState:
def __init__(self, cref, process_group):
r"""
A class that holds state information that is needed by the communication hook
during the training algorithm.
Args:
cref (DdpTrainer): reference to the self keyword of the trainer instance
process_group (ProcessGroup): distributed process group
"""
self.cref = cref
self.process_group = process_group
self.batch_number = -1
def get_key(self, bucket_index):
r"""
A method that returns an encoded key that represents the current batch and
bucket index.
Args:
bucket_index (int): index of the bucket being processed in backward
"""
return f"{self.batch_number},{bucket_index}"
def next_batch(self):
r"""
A method that increments batch_number by 1.
"""
self.batch_number += 1
def __init__(self, rank, trainer_count, process_group, use_cuda_rpc, server_rref, backend, epochs):
r"""
A trainer that implements a DDP training algorithm using a simple hook that performs allreduce
using the process_group implementation.
Args:
rank (int): worker rank
trainer_count (int): count of trainer in the world
process_group (ProcessGroup): distributed process group
use_cuda_rpc (bool): indicator for CUDA RPC
server_rref (RRef): remote reference to the server
backend (str): distributed communication backend
epochs (int): epoch count for training
"""
super().__init__(rank)
self.rank = rank
self.trainer_count = trainer_count
self.process_group = process_group
self.use_cuda_rpc = use_cuda_rpc
self.server_rref = server_rref
self.backend = backend
self.epochs = epochs
@staticmethod
def hook(state, bucket):
r"""
A ddp communication hook that uses the process_group allreduce implementation.
Args:
state (object): maintains state during the training process
bucket (GradBucket): gradient bucket
"""
cref = state.cref
tensors = [bucket.get_tensor() / state.process_group.size()]
key = state.get_key(bucket.get_index())
cref.record_hook_fut_start(key, f"{cref.backend}_allreduce")
fut = state.process_group.allreduce(tensors).get_future()
def callback(fut):
cref.record_hook_fut_end(key)
return fut.wait()
return fut.then(callback)
def get_hook(self):
r"""
returns DdpTrainer.hook
"""
return DdpTrainer.hook
def create_ddp_model(self, model):
r"""
A method that creates a ddp_model and hook_state objects.
It returns the ddp_model and hook_state objects.
Args:
model (nn.Module): neural network model
"""
ddp_model = DDP(
model, device_ids=[self.rank], process_group=self.process_group
)
hook_state = self.HookState(self, self.process_group)
ddp_model.register_comm_hook(hook_state, self.get_hook())
return ddp_model, hook_state
def create_criterion(self):
r"""
A method that creates a criterion for the training
algorithm.
"""
return nn.CrossEntropyLoss().cuda(self.rank)
def create_optimizer(self, parameters, lr):
r"""
A method that creates a optimizer for the training
algorithm.
Args:
parameters (iterable): iterable of parameters to optimize
lr (float): learning rate
"""
return torch.optim.SGD(parameters, lr)
def epoch_key(self, epoch, index):
r"""
A method that returns an encoded key that represents the current epoch and
iteration index.
Args:
epoch (int): epoch index
index (int): iteration index
"""
return f"{epoch},{index}"
def preprocess_data(self, data):
r"""
A method that moves the data from CPU to GPU.
Args:
data (list): training examples
"""
for i in range(len(data)):
data[i][0] = data[i][0].cuda(self.rank)
data[i][1] = data[i][1].cuda(self.rank)
return data
def iteration_step(self, ddp_model, criterion, optimizer, hook_state, epoch, index, batch):
r"""
A method that performs an iteration of training.
Args:
ddp_model (nn.Module): distributed data parallel model
criterion (nn.Module): loss function to measure model
optimizer (optim.Optimizer): updates model parameters
hook_state (object): ddp communication hook state object
epoch (int): index of pass through the data
index (int): iteration number - 1 in current batch
batch (list): training examples
"""
hook_state.next_batch()
input, target = batch[0], batch[1]
self.record_batch_start(self.epoch_key(epoch, index))
optimizer.zero_grad()
self.record_forward_start(self.epoch_key(epoch, index))
out = ddp_model(input)
self.record_forward_end(self.epoch_key(epoch, index))
loss = criterion(out, target)
self.record_backward_start(self.epoch_key(epoch, index))
loss.backward()
self.record_backward_end(self.epoch_key(epoch, index))
optimizer.step()
self.record_batch_end(self.epoch_key(epoch, index))
def train(self, model, data):
r"""
A method that implements the training algorithm.
Args:
model (nn.Module): neural network model
data (list): training examples
"""
model = model.cuda(self.rank)
data = self.preprocess_data(data)
ddp_model, hook_state = self.create_ddp_model(model)
criterion = self.create_criterion()
optimizer = self.create_optimizer(ddp_model.parameters(), 1e-4)
for epoch in range(self.epochs):
for index, batch in enumerate(data):
self.iteration_step(
ddp_model, criterion, optimizer, hook_state, epoch, index, batch
)
torch.cuda.synchronize(self.rank)

View File

@ -6,27 +6,44 @@ from .TrainerBase import TrainerBase
class DdpTrainerBase(TrainerBase):
HOOK_FUTURE_METRIC = "hook_future_metric"
NCCL_ALLREDUCE = "nccl_allreduce"
GLOO_ALLREDUCE = "gloo_allreduce"
def __init__(self, rank):
r"""
Inits DdpTrainerBase class.
Args:
rank (int): worker rank
"""
super().__init__(rank)
@staticmethod
@abstractmethod
def hook(state, bucket):
r"""
A method to be implemented by child class that will implement a DDP
training algorithm.
Args:
state (object): maintains state during the training process
bucket (GradBucket): gradient bucket
"""
return
def record_hook_fut_start(self, key, name, cuda=True):
r"""
A helper method that records a hook future metric
for the given key. A user should call this before
sending async request in the DDP communication hook.
Args:
key (str): unique id for metric within a group
cuda (bool): indicator to determine if this is a CUDA metric
"""
self.record_start(self.HOOK_FUTURE_METRIC, key, name, cuda)
def record_hook_fut_end(self, key):
r"""
A helper method that records a hook future metric
for the given key. A user should call this in a callback
attached to the future returned by an async request.
Args:
key (str): unique id for metric within a group
"""
self.record_end(self.HOOK_FUTURE_METRIC, key)
def bucket_to_parameters(self, bucket):
parameter_tensors = bucket.get_per_parameter_tensors()
parameter_tensors_count = len(parameter_tensors)
if parameter_tensors_count > 0:
return parameter_tensors
else:
return [bucket.get_tensor()]

View File

@ -15,13 +15,29 @@ class TrainerBase(ABC):
BACKWARD = "backward"
def __init__(self, rank):
r"""
Inits TrainerBase class.
Args:
rank (int): worker rank
"""
self.__metrics_logger = MetricsLogger(rank)
@abstractmethod
def train(self):
r"""
A method to be implemented by child class that will train a neural network.
"""
return
def record_start(self, type, key, name, cuda=True):
r"""
A method that records the start event for a metric.
Args:
type (str): group id for metric
key (str): unique id for metric within a group
name (str): description of the metric
cuda (bool): indicator to determine if this is a CUDA metric
"""
self.__metrics_logger.record_start(
type,
key,
@ -30,12 +46,26 @@ class TrainerBase(ABC):
)
def record_end(self, type, key):
r"""
A method that records the end event for a metric.
Args:
type (str): group id for metric
key (str): unique id for metric within a group
"""
self.__metrics_logger.record_end(
type,
key
)
def record_batch_start(self, key, cuda=True):
r"""
A helper method that records a batch metric for the
given key. A user should call this at the start of an
iteration step during training.
Args:
key (str): unique id for metric within a group
cuda (bool): indicator to determine if this is a CUDA metric
"""
self.__metrics_logger.record_start(
self.BATCH_LEVEL_METRIC,
key,
@ -44,12 +74,27 @@ class TrainerBase(ABC):
)
def record_batch_end(self, key):
r"""
A helper method that records a batch metric for the
given key. A user should call this at the end of an
iteration step during training.
Args:
key (str): unique id for metric within a group
"""
self.__metrics_logger.record_end(
self.BATCH_LEVEL_METRIC,
key
)
def record_forward_start(self, key, cuda=True):
r"""
A helper method that records a forward metric
for the given key. A user should call this before
their neural network forward.
Args:
key (str): unique id for metric within a group
cuda (bool): indicator to determine if this is a CUDA metric
"""
self.__metrics_logger.record_start(
self.FORWARD_METRIC,
key,
@ -58,12 +103,27 @@ class TrainerBase(ABC):
)
def record_forward_end(self, key):
r"""
A helper method that records a forward metric
for the given key. A user should call this after their
neural network forward.
Args:
key (str): unique id for metric within a group
"""
self.__metrics_logger.record_end(
self.FORWARD_METRIC,
key
)
def record_backward_start(self, key, cuda=True):
r"""
A helper method that records a backward metric
for the given key. A user should call this before
their .backward() call.
Args:
key (str): unique id for metric within a group
cuda (bool): indicator to determine if this is a CUDA metric
"""
self.__metrics_logger.record_start(
self.BACKWARD_METRIC,
key,
@ -72,6 +132,13 @@ class TrainerBase(ABC):
)
def record_backward_end(self, key):
r"""
A helper method that records a backward metric
for the given key. A user should call this after
.backward().
Args:
key (str): unique id for metric within a group
"""
self.__metrics_logger.record_end(
self.BACKWARD_METRIC,
key
@ -79,6 +146,13 @@ class TrainerBase(ABC):
@staticmethod
def methodmetric(name, type="method_metric", cuda=True):
r"""
A decorator that records a metric for the decorated method.
Args:
name (str): description of the metric
type (str): group id for metric
cuda (bool): indicator to determine if this is a CUDA metric
"""
def decorator(function):
@functools.wraps(function)
def wrapper(self, *args):
@ -91,7 +165,13 @@ class TrainerBase(ABC):
return decorator
def get_metrics(self):
r"""
A method that returns metrics captured by the __metrics_logger.
"""
return self.__metrics_logger.get_processed_metrics()
def clear_metrics(self):
r"""
A method that clears __metrics_logger recorded metrics.
"""
return self.__metrics_logger.clear_metrics()

View File

@ -0,0 +1,67 @@
import torch
RPC_SPARSE = "rpc_sparse"
RPC_DENSE = "rpc_dense"
def sparse_tensor_to_rpc_format(sparse_tensor):
r"""
A helper method creates a list containing the indices, values, and size
of a coalesced sparse tensor.
Args:
sparse_tensor (torch.Tensor): sparse_coo_tensor represented as a list
"""
sparse_tensor = sparse_tensor.coalesce()
return [sparse_tensor.indices(), sparse_tensor.values(), sparse_tensor.size()]
def sparse_rpc_format_to_tensor(sparse_rpc_format):
r"""
A helper method creates a sparse_coo_tensor from indices, values, and size.
Args:
sparse_rpc_format (list): sparse_coo_tensor represented as a list
"""
return torch.sparse_coo_tensor(
sparse_rpc_format[0], sparse_rpc_format[1], sparse_rpc_format[2]
).coalesce()
def process_bucket_with_remote_server(state, bucket):
r"""
Processes a gradient bucket passed by a DDP communication hook
during .backward(). The method supports processing sparse and dense
tensors. It records RPC future completion time metric for the trainer.
Args:
state (object): maintains state during the training process
bucket (GradBucket): gradient bucket
"""
cref = state.cref
tensor = bucket.get_tensor()
if not cref.use_cuda_rpc:
tensor = tensor.cpu()
sparse = tensor.is_sparse
if sparse:
tensor = sparse_tensor_to_rpc_format(tensor)
b_index = bucket.get_index()
server_args = [
cref.server_rref,
state.batch_number,
b_index,
tensor
]
key = state.get_key(b_index)
cref.record_hook_fut_start(
key,
RPC_SPARSE if sparse else RPC_DENSE
)
fut = cref.server_rref.rpc_async().average_gradient(*server_args)
def callback(fut):
cref.record_hook_fut_end(key)
tensor = fut.wait()
if type(tensor) is list:
tensor = sparse_rpc_format_to_tensor(tensor)
tensor = tensor.cuda(cref.rank)
return [tensor]
return fut.then(callback)