Files
pytorch/caffe2/python/data_parallel_model_test.py
Paul Jesse Hellemn b875fb281c Update from facebook (#7451)
* [bootcamp] Improve "Shape" operator to support axes specification

To improve .shape operator of Caffe2 to support x.shape(tensor, axes), which takes an optional int array "axes" as input. For example, x.shape(tensor, [1, 0]) will return the dimension for axis 1 and 0 following the specified order. For current version, "axes" input allows duplications and can have arbitrary length.

* Back out "Add barrier net that runs before training nets"

Original commit changeset: b373fdc9c30f. Need additional changes to some callers to support barrier failures.

* Change warning to verbose log to reduce log spam

The `LOG(WARNING)` was a bit spammy for regular use so lets just make it a `VLOG`.

* Extract the shared code from different caffe2_benchmark binaries

The OSS benchmark and Internal benchmark will share most functions in the benchmark.

* Support MFR in sequence training

As titled.

* Make knowledge distillation work with using logged prediction feature as teacher label.

1) Add loading raw dense feature as teacher label.
2) Optional calibration function for teacher label
3) Add teacher label into generic unit test
4) Deprecated TTSN workflow version using feature_options to config teacher label

* [C2/CUDA]: unjoined cross entropy sigmoid

as desc

* Add async_scheduling executor into deferrable_net_exec_test

Add async_scheduling into tests and fix some exception cases

* Fix Event disabled error

When disabling event in RNN ops make sure we don't call Finish on disabled
event from op's RunAsync

* cuda ensure cpu output op can handle both TensorCPU and TensorCUDA

as desc.

* [C2 Core] Infer input device option in C2 hypothesis_test checkers

Improve how we default input blob device options.
Previously it defaults as where op lives but it is not necessarily the case.

For example:
CopyCPUToGPU

* [C2 Op]SplitByLengthsOp CPU/GPU implementation

[C2 Op]SplitByLengthsOp CPU/GPU implementation

* fix undefined symbol error

not sure why we're getting undefined symbol even with link_whole = True
Need to figure out why but need this workaround for now

* Add tools in DAIPlayground platform to help debugging models

Add additional tools to allow Plauground override individual method defined in AnyExp.  This will allow user to create module that specificly change certain default method behavior.  An example included in this diff is deactivating test model and checkpointing.  When debugging any model problems, switching off components helps me quickly narrow down the location of the bug.  The technique is extensively used in task T27038712 (Steady memory increase in EDPM, eventually resulting in gloo/cuda.cu:34: out of memory)

* add shape and type inference for int8 conversion operator

* Fix flaky test for group_norm

Fix flaky test for group_norm

* Fix group_norm_op_test flaky

Fix group_norm_op_test flaky

* Implementation of composite learning rate policy

In many state-of-the-arts deep learning works, people use a simple trick to
schedule the learning rate: use a fixed learning rate until error plateaus
and then switch to a different fixed learning rate, and so on. In this diff,
we implemented a simple version of the composite learning rate. The user gives
a set of learning rates policies and corresponding iteration nums, and the
optimizer will change the learning rate policy based on the number of iterations so far.

For example, the user give two learning rate policies, one is FixedLearningRate
and PolyLearningRate, with an iteration number of 1k. Then the first 1k iteration,
we use FixedLearningRate. For the following iterations, we use PolyLearningRate.

* Split two use cases of CachedReader into two classes, DBFileReader and CachedReader

# Use Cases:

1). input: DB file -> output: DatasetReader.

Use DBFileReader.

2). input: Reader -> build cache DB file -> output: DatasetReader.

Use CachedReader.

# Changes to CachedReader:

1). Move db_path to the constructor.
Because in mock reader. cache will always be built ahead.

# Changes to tests:

1). Make a separate TestCase class for CachedReader and DBFileReader.

2). Make it possible to add more test functions by adding setUp, tearDown and _make_temp_path.

3). Make delete db_path more general. `db_path` could be a file for `log_file_db`, but could also be a directory for `leveldb`.

* Back out "On Mobile phones, call GlobalInit with no arguments in predictor in case we need to perform initialization"

Original commit changeset: 4489c6133f11

* Fix LARS bug

Fixed a bug in the LARS implementation which caused all subsequent blobs not using LARS to have the LARS learning rate multiplier applied to them.

* [tum] support sparse init & add uniformFill option

as title

* Propagate exception for async nets

Capture the exception when an exception is thrown in async nets and re-throw it after wait().  This allows exceptions to be propagated up to the caller.

This diff was a part of D7752068.  We split the diff so that C2 core files changes are in a separate diff.

* Automatic update of fbcode/onnx to 69894f207dfcd72d1e70497d387201cec327efbc

Previous import was 403ccfbd0161c38f0834413d790bad0874afbf9a

Included changes:
- **[69894f2](https://github.com/onnx/onnx/commit/69894f2)**: Use op schema.all tensor types in random like definitions (#865) <Scott McKay>
- **[b9d6b90](https://github.com/onnx/onnx/commit/b9d6b90)**: Clarify random like operators (#846) <Scott McKay>
- **[fc6b5fb](https://github.com/onnx/onnx/commit/fc6b5fb)**: Refactor shape inference implementation (#855) <anderspapitto>
- **[b7d8dc8](https://github.com/onnx/onnx/commit/b7d8dc8)**: fix cmake warning message (#863) <Eric S. Yu>
- **[f585c5d](https://github.com/onnx/onnx/commit/f585c5d)**: add pytorch-operator test for tile (#831) <Wenhao Hu>
- **[993fe70](https://github.com/onnx/onnx/commit/993fe70)**: add install step (#832) <Eric S. Yu>
- **[68bc26c](https://github.com/onnx/onnx/commit/68bc26c)**: add type inference for traditional ml ops except classifier ops. (#857) <Ke Zhang>
- **[9cc0cda](https://github.com/onnx/onnx/commit/9cc0cda)**: fix string representation of scalar types (#858) <G. Ramalingam>
- **[1078925](https://github.com/onnx/onnx/commit/1078925)**: fix y in pow test case to scalar (#852) <Wenhao Hu>
- **[c66fb6f](https://github.com/onnx/onnx/commit/c66fb6f)**: Add some math function shape inference (#845) <anderspapitto>
- **[ff667d1](https://github.com/onnx/onnx/commit/ff667d1)**: Refactor return type and docs for ONNXIFI_BACKEND_DIRECTX_ID (#853) <Marat Dukhan>
- **[11c6876](https://github.com/onnx/onnx/commit/11c6876)**: clear initializer names when clear initializer (#849) <Wenhao Hu>
- **[73c34ae](https://github.com/onnx/onnx/commit/73c34ae)**: Clarify FeatureVectorizer description. (#843) <Scott McKay>
- **[1befb9b](https://github.com/onnx/onnx/commit/1befb9b)**: Remove useless text in docs (#850) <Lu Fang>
- **[e84788f](https://github.com/onnx/onnx/commit/e84788f)**: Fix SELU attributes' default values (#839) <Lu Fang>
- **[ebac046](https://github.com/onnx/onnx/commit/ebac046)**: Add tile test case (#823) <Wenhao Hu>
- **[8b7a925](https://github.com/onnx/onnx/commit/8b7a925)**: a few more shape inference functions (#772) <anderspapitto>
- **[9718f42](https://github.com/onnx/onnx/commit/9718f42)**: Make the coefficient non optional for LinearClassifier (#836) <Jaliya Ekanayake>
- **[ef083d0](https://github.com/onnx/onnx/commit/ef083d0)**: Add save_tensor and load_tensor functions for Protos (#770) <Lu Fang>
- **[45ceb55](https://github.com/onnx/onnx/commit/45ceb55)**: Check if CMAKE_BUILD_TYPE set before project(). (#812) <Sergii Dymchenko>
- **[4b3d2b0](https://github.com/onnx/onnx/commit/4b3d2b0)**: [WIP] reenable shape inference tests (#834) <anderspapitto>
- **[22d17ee](https://github.com/onnx/onnx/commit/22d17ee)**: RNN tests: LSTM, GRU, SimpleRNN (#739) <Peyman Manikashani>
- **[de65b95](https://github.com/onnx/onnx/commit/de65b95)**: dimension denotation (#443) <Tian Jin>
- **[eccc76e](https://github.com/onnx/onnx/commit/eccc76e)**: fix field number issue in onnx operator proto and enable its build (#829) <Ke Zhang>
- **[d582beb](https://github.com/onnx/onnx/commit/d582beb)**: disable shape inference test to unbreak ci (#830) <Lu Fang>
- **[485b787](https://github.com/onnx/onnx/commit/485b787)**: function proto for composite op. (#802) <Ke Zhang>
- **[cd58928](https://github.com/onnx/onnx/commit/cd58928)**: specify defaults for attributes of Affine op (#820) <G. Ramalingam>
- **[7ee2cf9](https://github.com/onnx/onnx/commit/7ee2cf9)**: merge the dummy backend back into the main one (#743) <anderspapitto>
- **[1c03a5a](https://github.com/onnx/onnx/commit/1c03a5a)**: [Proposal] ONNX Interface for Framework Integration (previously ONNX Backend API) header and docs (#551) <Marat Dukhan>
- **[3769a98](https://github.com/onnx/onnx/commit/3769a98)**: Rename real model test case from VGG-16 to ZFNet (#821) <Lu Fang>

* [C2]ReluN Op

relu n op.

tf reference: https://www.tensorflow.org/api_docs/python/tf/nn/relu6

* Call destructor when assigning a blob value

* Add executor overrides

Add executor overrides flag to enable migration to async_scheduling executor

* Add barrier net that runs before training nets - attempt #2

Add a synchonize barrier net that is run before training nets.  With this net, shards that are faster will wait for other shards before start training.  This reduce chances of the faster shards timing out during GLOO AllReduce.
Removed explicit data_parallel_model.py.synchronize call in holmes workflow.

This change was landed previously but caused errors for some EDPM workflows - See https://fb.facebook.com/groups/1426530000692545/permalink/1906766366002237/ - because EDPM assumes any call to CreateOrCloneCommonWorld and Gloo ops are wrapped in exception handlers but in this case exception thrown in the barrier init net is not handled.

To address this issue, we add _CreateOrCloneCommonWorld to the param_init_net instead of a new barrier init net.  Since errors for param_init_net run is handled gracefully and re-rendezvous, it should fixes the problem.

* Handle empty nets in async_scheduling

Make sure we don't get stuck on empty nets

* use CUDA_ARCH for conditional compile

* [C2 fix] infer function for ensure_cpu_output_op

* Update group_norm test to reduce flaky test

* Fix lr_multiplier for GPU
2018-05-10 23:14:27 -07:00

1134 lines
42 KiB
Python

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from future.utils import viewkeys
from multiprocessing import Process, Queue
import numpy as np
import os
import shutil
import tempfile
import unittest
import time
from mock import Mock
from hypothesis import assume, given
import hypothesis.strategies as st
from caffe2.proto import caffe2_pb2
from caffe2.python import brew, core, cnn, data_parallel_model, dyndep, \
model_helper, optimizer, rnn_cell, workspace
from caffe2.python.test_util import TestCase
dyndep.InitOpsLibrary("@/caffe2/caffe2/distributed:file_store_handler_ops")
class TemporaryDirectory:
def __enter__(self):
self.tmpdir = tempfile.mkdtemp()
return self.tmpdir
def __exit__(self, type, value, traceback):
shutil.rmtree(self.tmpdir)
# Note(jiayq): we are yet to find out why Travis gives out an error in gloo
# like:
# RuntimeError: [enforce fail at /home/travis/build/caffe2/caffe2/third_party/gloo/gloo/transport/tcp/device.cc:113] ifa != nullptr. Unable to find interface for: [127.0.1.1]
# See for example https://travis-ci.org/caffe2/caffe2/jobs/262433866
# As a result, we will check if this is travis, and if yes, disable it.
@unittest.skipIf(os.environ.get("TRAVIS"), "DPMTest has a known issue with Travis.")
class DataParallelModelTest(TestCase):
def run_model(self, devices, gpu):
'''
Helper function for test_equiv
'''
def input_builder_fun(model):
return None
def model_build_fun(model, loss_scale):
fc = model.FC("data", "fc", 16, 1,
("ConstantFill", {}), ("ConstantFill", {}))
fc_fl = model.FlattenToVec(fc, "fc_fl")
sigm = model.Sigmoid(fc_fl, "sigm")
sq = model.SquaredL2Distance([sigm, "label"], "sq")
loss = model.AveragedLoss(sq, "loss")
loss = model.Scale(loss, scale=loss_scale)
# For testing explicit sync
model.param_init_net.UniformFill([], ["sync_num"], shape=[1])
return [loss]
def add_optimizer(model):
return optimizer.build_sgd(
model,
0.1,
policy="fixed",
max_gradient_norm=5.0,
allow_lr_injection=True,
)
workspace.ResetWorkspace()
model = cnn.CNNModelHelper(
order="NHWC",
name="test{}".format(devices),
)
data_parallel_model.Parallelize(
model,
input_builder_fun=input_builder_fun,
forward_pass_builder_fun=model_build_fun,
optimizer_builder_fun=add_optimizer,
devices=devices,
cpu_device=not gpu,
shared_model=not gpu,
combine_spatial_bn=not gpu,
)
data_parallel_model.AddBlobSync(model, ["sync_num"])
# Light test for LR names
lr_names = data_parallel_model.GetLearningRateBlobNames(model)
self.assertGreater(len(lr_names), 0)
np.random.seed(2603)
# Each run has same input, independent of number of gpus
batch_size = 64
for i in range(0, 10):
full_data = np.random.rand(batch_size, 16)
full_labels = np.round(full_data[:, 0])
batch_per_device = batch_size // len(devices)
for (j, g) in enumerate(devices):
st = j * batch_per_device
en = st + batch_per_device
data = full_data[st:en, :].astype(np.float32)
labels = full_labels[st:en].astype(np.float32)
with core.DeviceScope(core.DeviceOption(model._device_type, g)):
workspace.FeedBlob(
"{}_{}/data".format(model._device_prefix, g), data
)
workspace.FeedBlob(
"{}_{}/label".format(model._device_prefix, g), labels
)
if i == 0:
workspace.RunNetOnce(model.param_init_net)
workspace.CreateNet(model.net)
workspace.FeedBlob(
model._device_prefix + "_0/sync_num",
np.array([i * 2]).astype(np.float32),
device_option=core.DeviceOption(model._device_type, 0))
workspace.RunNet(model.net.Proto().name)
# Test AddBlobSync
for j in model._devices:
sync = workspace.FetchBlob(
model._device_prefix + "_{}/sync_num".format(j))[0]
self.assertTrue(abs(sync - i * 2) < 0.01)
return workspace.FetchBlob("{}_0/fc_w".format(model._device_prefix))
def run_test_locally(self, fn, device_option=None, **kwargs):
# Queue for assertion errors on subprocesses
queue = Queue()
# Capture any exception thrown by the subprocess
def run_fn(*args, **kwargs):
try:
if device_option is None:
fn(*args, **kwargs)
workspace.ResetWorkspace()
else:
with core.DeviceScope(device_option):
fn(*args, **kwargs)
workspace.ResetWorkspace()
except Exception as ex:
queue.put(ex)
# Start N processes in the background
procs = []
for i in range(kwargs['comm_size']):
kwargs['comm_rank'] = i
proc = Process(
target=run_fn,
kwargs=kwargs)
proc.start()
procs.append(proc)
# Test complete, join background processes
while len(procs) > 0:
proc = procs.pop(0)
while proc.is_alive():
proc.join(1)
# Raise exception if we find any.
# Note that the following is executed ALSO after
# the last process was joined, so if ANY exception
# was raised, it will be re-raised here.
if not queue.empty():
raise queue.get()
def test_equiv(self):
'''
Test that the model produces exactly same results given
total batchsize, independent of number of GPUs.
'''
for gpu in [True, False]:
if gpu and (not workspace.has_gpu_support or
workspace.NumCudaDevices() < 2):
continue
result_2gpus = self.run_model([0, 1], gpu=gpu)
result_1gpus = self.run_model([0], gpu=gpu)
self.assertTrue(np.allclose(result_1gpus, result_2gpus))
if not gpu or workspace.NumCudaDevices() >= 4:
result_4gpus = self.run_model(list(range(4)), gpu=gpu)
self.assertTrue(np.allclose(result_1gpus, result_4gpus))
if not gpu or workspace.NumCudaDevices() >= 8:
result_8gpus = self.run_model(list(range(8)), gpu=gpu)
self.assertTrue(np.allclose(result_1gpus, result_8gpus))
if not gpu or workspace.NumCudaDevices() >= 16:
result_16gpus = self.run_model(list(range(16)), gpu=gpu)
self.assertTrue(np.allclose(result_1gpus, result_16gpus))
def test_checkpoint_params(self):
def add_input_ops(model):
pass
def add_model_ops(model, loss_scale):
model.NHWC2NCHW("data", "data_nchw")
model.Conv("data_nchw", 'conv1', 3, 64,
weight_init=("MSRAFill", {}), kernel=7,
stride=2, pad=3, no_bias=0)
model.SpatialBN('conv1', 'conv1_spatbn_relu', 64, epsilon=1e-3, is_test=False)
model.Relu('conv1_spatbn_relu', 'conv1_spatbn_relu')
model.MaxPool('conv1_spatbn_relu', 'pool1', kernel=3, stride=2)
model.FC('pool1', 'fc', dim_in=(64 * 56 * 56), dim_out=100)
model.Sigmoid('fc', 'fc_sigm')
model.Softmax('fc_sigm', 'softmax')
model.LabelCrossEntropy(['softmax', 'label'], 'xent')
loss = model.AveragedLoss('xent', 'loss')
# Add a duplicate param init to ensure it does not cause issues
model.param_init_net.ConstantFill(
[], ["fc_w"], shape=((64 * 56 * 56), 1000)
)
return [loss]
def add_optimizer(model):
optimizer.build_sgd(model, 0.1, policy="fixed", momentum=0.9)
model = cnn.CNNModelHelper(
order="NHWC",
name="test",
)
data_parallel_model.Parallelize_CPU(
model,
input_builder_fun=add_input_ops,
forward_pass_builder_fun=add_model_ops,
optimizer_builder_fun=add_optimizer,
devices=[1, 2, 3],
)
# Only gpu_1 params should be returned (gpu_1 is the first gpu)
checkpoint_params = data_parallel_model.GetCheckpointParams(model)
for p in model.GetParams("cpu_1/"):
self.assertTrue(p in checkpoint_params)
self.assertTrue(p + "_momentum" in checkpoint_params)
for p in model.GetParams("cpu_2/"):
self.assertFalse(p in checkpoint_params)
self.assertTrue(
core.BlobReference("cpu_1/fc_w_momentum") in checkpoint_params)
for c in model.GetComputedParams("cpu_1/"):
self.assertTrue(c in checkpoint_params)
for c in model.GetComputedParams("cpu_2/"):
self.assertFalse(c in checkpoint_params)
self.assertFalse(core.BlobReference("cpu_1/data") in checkpoint_params)
self.assertTrue(core.BlobReference("optimizer_iteration") in checkpoint_params)
def test_net_conversion_and_append_net(self):
other = model_helper.ModelHelper()
fc1 = brew.fc(other, "data", "other_fc1", dim_in=3*227*227, dim_out=10)
fc2 = brew.fc(other, fc1, "other_fc2", dim_in=10, dim_out=10)
brew.fc(other, fc2, "other_fc3", dim_in=10, dim_out=10)
def add_input_ops(model):
model.net.UniformFill([], ["data"], shape=[4, 227, 227, 3])
model.net.UniformFill([], ["label"], shape=[4])
def add_model_ops(model, loss_scale):
model.NHWC2NCHW("data", "data_nchw")
model.Conv("data_nchw", 'conv1', 3, 64,
weight_init=("MSRAFill", {}), kernel=7,
stride=2, pad=3, no_bias=0)
model.SpatialBN('conv1', 'conv1_spatbn_relu', 64, epsilon=1e-3, is_test=False)
model.Relu('conv1_spatbn_relu', 'conv1_spatbn_relu')
model.MaxPool('conv1_spatbn_relu', 'pool1', kernel=3, stride=2)
model.FC('pool1', 'fc', dim_in=(64 * 56 * 56), dim_out=10)
# Append the net and param_init_net of the other model
appendnet = data_parallel_model.ConvertNetForDevice(other.net)
model.net.AppendNet(appendnet)
model.param_init_net.AppendNet(
data_parallel_model.ConvertNetForDevice(other.param_init_net))
model.Sigmoid('fc', 'fc_sigm')
model.Softmax('fc_sigm', 'softmax')
loss = model.AveragedLoss('softmax', 'loss')
return [loss]
def add_optimizer(model):
optimizer.build_sgd(model, 0.1, policy="fixed", momentum=0.9)
model = cnn.CNNModelHelper(
order="NCHW",
name="test",
)
data_parallel_model.Parallelize_CPU(
model,
input_builder_fun=add_input_ops,
forward_pass_builder_fun=add_model_ops,
optimizer_builder_fun=add_optimizer,
devices=range(4)
)
# Just create and run net and confirm no exception is thrown
workspace.RunNetOnce(model.param_init_net)
workspace.CreateNet(model.net)
workspace.RunNet(model.net)
def test_synchronization_barrier(self):
def run(comm_rank, comm_size, tmpdir):
def add_input_ops(model):
pass
def add_model_ops(model, loss_scale):
return []
def add_optimizer(model):
pass
store_handler = "store_handler"
workspace.RunOperatorOnce(
core.CreateOperator(
"FileStoreHandlerCreate",
[],
[store_handler],
path=tmpdir))
rendezvous = dict(
kv_handler=store_handler,
shard_id=comm_rank,
num_shards=comm_size,
engine='GLOO',
)
model = cnn.CNNModelHelper(
order="NHWC",
name="test",
)
data_parallel_model.Parallelize_CPU(
model,
input_builder_fun=add_input_ops,
forward_pass_builder_fun=add_model_ops,
optimizer_builder_fun=add_optimizer,
devices=[1, 2, 3],
rendezvous=rendezvous
)
data_parallel_model.RunInitNet(model)
for _ in range(2):
data_parallel_model.Synchronize(model)
with TemporaryDirectory() as tmpdir:
self.run_test_locally(
run,
comm_size=2,
device_option=None,
tmpdir=tmpdir)
def test_pre_train_synchronization_barrier(self):
def run(comm_rank, comm_size, tmpdir):
def add_input_ops(model):
pass
def add_model_ops(model, loss_scale):
return []
def add_optimizer(model):
pass
workspace.ResetWorkspace()
store_handler = "store_handler"
workspace.RunOperatorOnce(
core.CreateOperator(
"FileStoreHandlerCreate",
[],
[store_handler],
path=tmpdir))
rendezvous = dict(
kv_handler=store_handler,
shard_id=comm_rank,
num_shards=comm_size,
engine='GLOO',
)
model = cnn.CNNModelHelper(
order="NHWC",
name="test",
)
# Set network timeout to 2 seconds, and add a 3 seconds
# sleep for 1 host. Make sure there is no timeout on the
# second RunNet.
data_parallel_model._DEFAULT_TIMEOUT_SEC = 2
data_parallel_model.Parallelize_CPU(
model,
input_builder_fun=add_input_ops,
forward_pass_builder_fun=add_model_ops,
optimizer_builder_fun=add_optimizer,
devices=[1, 2, 3],
rendezvous=rendezvous,
barrier_net_timeout_sec=5
)
data_parallel_model.RunInitNet(model)
data_parallel_model.RunNet(model, 2)
if comm_rank == 0:
time.sleep(data_parallel_model._DEFAULT_TIMEOUT_SEC)
data_parallel_model.RunNet(model, 2)
with TemporaryDirectory() as tmpdir:
self.run_test_locally(
run,
comm_size=2,
device_option=None,
tmpdir=tmpdir)
def test_device_scope_check(self):
with self.assertRaises(AssertionError):
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, 0)):
data_parallel_model.Parallelize_GPU(None, None, None)
def test_net_transformer_function(self):
devices = [1, 2, 3]
def add_input_ops(model):
model.param_init_net.UniformFill([], ["data"], shape=[32, 8])
def add_optimizer(model):
optimizer.build_sgd(model, 0.1)
def add_model_ops(model, loss_scale):
fc1 = brew.fc(model, "data", "fc1", dim_in=8, dim_out=8)
return [fc1]
kwargs = {
'input_builder_fun': add_input_ops,
'forward_pass_builder_fun': add_model_ops,
'devices': devices,
}
# assert that the transformer is called for both train and test cases
transform = Mock()
kwargs['net_transformer_fun'] = transform
model = model_helper.ModelHelper(name="r", init_params=False)
data_parallel_model.Parallelize_CPU(model, **kwargs)
self.assertTrue(transform.called)
self.assertEqual(transform.call_count, 1)
transform = Mock()
kwargs['net_transformer_fun'] = transform
kwargs['optimizer_builder_fun'] = add_optimizer
model = model_helper.ModelHelper(name="r", init_params=True)
data_parallel_model.Parallelize_CPU(model, **kwargs)
self.assertTrue(transform.called)
self.assertEqual(transform.call_count, 1)
class RecurrentNetworkParallelTest(TestCase):
def run_model(self, devices, gpu):
'''
Helper function for test_equiv
'''
def input_builder_fun(model):
return None
def model_build_fun(model, loss_scale):
workspace.FeedBlob(
core.ScopedBlobReference("seq_lengths"),
np.array([self.T] * self.batch_per_device, dtype=np.int32)
)
model.param_init_net.ConstantFill(
[],
"hidden_init",
value=0.0,
shape=[1, self.batch_per_device, self.hidden_dim]
)
model.param_init_net.ConstantFill(
[],
"cell_init",
value=0.0,
shape=[1, self.batch_per_device, self.hidden_dim]
)
output, _last_hidden, _, _last_state, = rnn_cell.LSTM(
model=model,
input_blob="data",
seq_lengths="seq_lengths",
initial_states=("hidden_init", "cell_init"),
dim_in=self.input_dim,
dim_out=self.hidden_dim,
scope="partest",
)
# A silly loss function
loss = model.AveragedLoss(
model.Sub([output, "target"], "dist"),
"loss",
)
loss = model.Scale(loss, "loss_scaled", scale=loss_scale)
return [loss]
def param_update_fun(model):
ITER = model.Iter("ITER")
LR = model.net.LearningRate(
[ITER],
"LR",
base_lr=(-0.1),
policy="fixed",
)
ONE = model.param_init_net.ConstantFill(
[], "ONE", shape=[1], value=1.0,
)
for param in model.GetParams():
param_grad = model.param_to_grad[param]
model.WeightedSum([param, ONE, param_grad, LR], param)
assert len(model.GetParams()) == len(model.params) // len(model._devices)
workspace.ResetWorkspace()
model = cnn.CNNModelHelper(
name="recurrent_test{}".format(devices),
)
self.T = 8
self.batch_size = 64
self.input_dim = 8
self.hidden_dim = 31
self.batch_per_device = self.batch_size // len(devices)
data_parallel_model.Parallelize(
model,
input_builder_fun=input_builder_fun,
forward_pass_builder_fun=model_build_fun,
param_update_builder_fun=param_update_fun,
devices=devices,
optimize_gradient_memory=True,
cpu_device=not gpu,
)
# Change all initialization to be ConstantFills so that
# the everything is deterministic
for op in model.param_init_net.Proto().op:
if op.type.endswith('Fill'):
op.type = 'ConstantFill'
# Each run has same input, independent of number of gpus
np.random.seed(20150210)
for i in range(0, 10):
full_data = np.random.rand(self.T, self.batch_size, self.input_dim)
full_target = np.random.rand(
self.T, self.batch_size, self.hidden_dim
)
for (j, g) in enumerate(devices):
st = j * self.batch_per_device
en = st + self.batch_per_device
data = full_data[:, st:en, :].astype(np.float32)
targets = full_target[:, st:en, :].astype(np.float32)
with core.DeviceScope(core.DeviceOption(model._device_type, g)):
workspace.FeedBlob(
"{}_{}/data".format(model._device_prefix, g), data
)
workspace.FeedBlob(
"{}_{}/target".format(model._device_prefix, g), targets
)
if i == 0:
workspace.RunNetOnce(model.param_init_net)
workspace.CreateNet(model.net)
workspace.RunNet(model.net.Proto().name)
return workspace.FetchBlob("{}_0/partest/i2h_w".format(model._device_prefix))
def test_equiv_recurrent(self):
'''
Test that the model produces exactly same results given
total batchsize, independent of number of GPUs/CPUs.
'''
for gpu in [True, False]:
if gpu and not workspace.has_gpu_support:
continue
result_2gpus = self.run_model([0, 1], gpu)
result_1gpus = self.run_model([0], gpu)
self.assertTrue(np.allclose(result_1gpus, result_2gpus))
if not gpu or workspace.NumCudaDevices() >= 4:
result_4gpus = self.run_model(list(range(4)), gpu)
self.assertTrue(np.allclose(result_1gpus, result_4gpus))
if not gpu or workspace.NumCudaDevices() >= 8:
result_8gpus = self.run_model(list(range(8)), gpu)
self.assertTrue(np.allclose(result_1gpus, result_8gpus))
@unittest.skipIf(not workspace.has_gpu_support, "No gpu support.")
@unittest.skipIf(workspace.NumCudaDevices() < 2, "Need at least 2 GPUs.")
class SparseDataParallelModelTest(TestCase):
'''
Create and run the model. We try with both storing indices for gather
on CPU and on GPU
'''
def run_model(self, V, gpu_devices, cpu_indices):
def input_builder_fun(model):
return None
def model_build_fun(model, loss_scale):
if cpu_indices:
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
gathered_cpu = model.net.Gather(
[self.vecs, 'indices'], 'gathered_cpu')
gathered = model.CopyCPUToGPU(gathered_cpu, "gathered")
else:
gpu_vecs = model.param_init_net.CopyCPUToGPU(
self.vecs, "gpuvecs",
)
model.params.append(gpu_vecs)
gathered = model.net.Gather([gpu_vecs, 'indices'], 'gathered')
flattened = model.Flatten(gathered, "flattened")
fc = model.FC(flattened, "fc", 16 * 16, 1,
("ConstantFill", {}), ("ConstantFill", {}))
fc_fl = model.FlattenToVec(fc, "fc_fl")
sigm = model.Sigmoid(fc_fl, "sigm")
sq = model.SquaredL2Distance([sigm, "label"], "sq")
loss = model.AveragedLoss(sq, "loss")
loss = model.Scale(loss, scale=loss_scale)
return [loss]
def param_update_fun(model):
ONE = model.param_init_net.ConstantFill(
[], "ONE", shape=[1], value=1.0,
)
LR = model.CopyCPUToGPU(self.LR, "LR")
for param in model.GetParams():
param_grad = model.param_to_grad[param]
if not isinstance(param_grad, core.GradientSlice):
model.WeightedSum([param, ONE, param_grad, LR], param)
else:
param_momentum = model.param_init_net.ConstantFill(
[param],
param + '_momentum',
value=0.0,
)
model.net.SparseMomentumSGDUpdate(
[
param_grad.values,
param_momentum,
LR,
param,
param_grad.indices,
],
[
param_grad.values, param_momentum, param
],
momentum=0.1,
nesterov=0,
)
workspace.ResetWorkspace()
model = cnn.CNNModelHelper(
order="NHWC",
name="sparse_test{}".format(gpu_devices),
)
with core.NameScope("cpu"):
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
self.ITER = model.Iter("ITER")
self.LR = model.net.LearningRate(
[self.ITER],
"LR",
base_lr=(-0.1),
policy="fixed",
)
self.vecs = model.param_init_net.UniformFill(
[], "vecs", shape=[V, 16])
if cpu_indices:
model.params.append(self.vecs)
self.ONE_CPU = model.param_init_net.ConstantFill(
[], "ONE_CPU", shape=[1], value=1.0,
)
data_parallel_model.Parallelize_GPU(
model,
input_builder_fun=input_builder_fun,
forward_pass_builder_fun=model_build_fun,
param_update_builder_fun=param_update_fun,
devices=gpu_devices,
)
# Update the vecs
if cpu_indices:
with core.NameScope("cpu"):
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
for param in model.GetParams():
param_grad = model.param_to_grad[param]
model.ScatterWeightedSum([param, self.ONE_CPU,
param_grad.indices,
param_grad.values,
self.LR],
self.vecs)
else:
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, 0)):
model.CopyGPUToCPU("gpu_0/gpuvecs", self.vecs)
np.random.seed(2603)
# Each run has same input, independent of number of gpus
batch_size = 64
for i in range(0, 10):
full_indices = np.random.permutation(V)[:batch_size * 16].reshape(
batch_size, 16
)
full_labels = full_indices[:, 0] % 2
batch_per_device = batch_size // len(gpu_devices)
for (j, g) in enumerate(gpu_devices):
st = j * batch_per_device
en = st + batch_per_device
indices = full_indices[st:en, :].astype(np.int32)
labels = full_labels[st:en].astype(np.float32)
device_for_indices = core.DeviceOption(caffe2_pb2.CPU)
if not cpu_indices:
device_for_indices = core.DeviceOption(caffe2_pb2.CUDA, g)
with core.DeviceScope(device_for_indices):
workspace.FeedBlob("gpu_{}/indices".format(g), indices)
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, g)):
workspace.FeedBlob("gpu_{}/label".format(g), labels)
if i == 0:
workspace.RunNetOnce(model.param_init_net)
# Force vecs to be same on all runs
orig_vecs = np.random.rand(V, 16).astype(np.float32)
workspace.FeedBlob(
self.vecs,
orig_vecs
)
if not cpu_indices:
for g in gpu_devices:
workspace.FeedBlob(
"gpu_{}/gpuvecs".format(g),
orig_vecs,
device_option=core.DeviceOption(caffe2_pb2.CUDA, g),
)
workspace.CreateNet(model.net)
workspace.RunNet(model.net.Proto().name)
if len(gpu_devices) == 2:
if not cpu_indices:
idx = workspace.FetchBlob("gpu_0/indices")
idx = list(idx.flatten())
n = len(idx)
nu = len(set(idx))
assert n == nu, "We cannot have duplicate indices"
# Sanity check to see the vecs were updated
self.assertFalse(
np.allclose(workspace.FetchBlob(self.vecs), orig_vecs))
return [workspace.FetchBlob(self.vecs if cpu_indices else "gpu_0/gpuvecs"),
workspace.FetchBlob("gpu_0/fc_w")]
def _test_equiv_sparse(self, cpu_indices):
'''
Test that the model produces exactly same results given
total batchsize, independent of number of GPUs.
'''
V = 10000
result_2gpus = self.run_model(V, [0, 1], cpu_indices)
result_1gpus = self.run_model(V, [0], cpu_indices)
self.assertTrue(np.allclose(result_1gpus[0], result_2gpus[0]))
self.assertTrue(np.allclose(result_1gpus[1], result_2gpus[1]))
if workspace.NumCudaDevices() >= 4:
result_4gpus = self.run_model(V, list(range(4)), cpu_indices)
self.assertTrue(np.allclose(result_1gpus[0], result_4gpus[0]))
self.assertTrue(np.allclose(result_1gpus[1], result_4gpus[1]))
if workspace.NumCudaDevices() >= 8:
result_8gpus = self.run_model(V, list(range(8)), cpu_indices)
self.assertTrue(np.allclose(result_1gpus[0], result_8gpus[0]))
self.assertTrue(np.allclose(result_1gpus[1], result_8gpus[1]))
def test_equiv_sparse(self):
self._test_equiv_sparse(True)
self._test_equiv_sparse(False)
@unittest.skipIf(workspace.NumCudaDevices() < 2, "Need at least 2 GPUs.")
class ParallelizeBMUFTest(TestCase):
def _run_model(self, gpu_devices):
'''
Helper function for test_equiv
'''
def input_builder_fun(model):
return None
def _model_build_fun(self, model, loss_scale):
fc = model.FC(
"data", "fc", 16, 1, ("ConstantFill", {}), ("ConstantFill", {})
)
fc_fl = model.FlattenToVec(fc, "fc_fl")
sigm = model.Sigmoid(fc_fl, "sigm")
sq = model.SquaredL2Distance([sigm, "label"], "sq")
loss = model.AveragedLoss(sq, "loss")
loss = model.Scale(loss, scale=loss_scale)
return [loss]
def _param_update_fun(self, model):
ITER = model.Iter("ITER")
LR = model.net.LearningRate(
[ITER],
"LR",
base_lr=(-0.1),
policy="fixed",
)
ONE = model.param_init_net.ConstantFill(
[], "ONE", shape=[1], value=1.0,
)
for param in model.GetParams():
grad = model.param_to_grad[param]
model.WeightedSum([param, ONE, grad, LR], param)
def _generate_data(self, devices, device_type, device_prefix):
np.random.seed(26)
# Each run has same input, independent of number of gpus
batch_size = 64
for _ in range(0, 10):
full_data = np.random.rand(batch_size, 16)
full_labels = np.round(full_data[:, 0])
batch_per_device = batch_size // len(devices)
for (j, g) in enumerate(devices):
st = j * batch_per_device
en = st + batch_per_device
data = full_data[st:en, :].astype(np.float32)
labels = full_labels[st:en].astype(np.float32)
with core.DeviceScope(core.DeviceOption(device_type, g)):
workspace.FeedBlob("{}_{}/data".format(device_prefix, g), data)
workspace.FeedBlob("{}_{}/label".format(device_prefix, g), labels)
@given(
cpu_device=st.booleans()
)
def test_parallelize_bmuf(self, cpu_device):
assume(cpu_device or workspace.has_gpu_support)
workspace.ResetWorkspace()
model = cnn.CNNModelHelper(
order="NHWC",
name="test"
)
devices = [0, 1]
def input_builder_fun(model):
return None
if not cpu_device:
device_type = caffe2_pb2.CUDA
device_prefix = "gpu"
else:
device_type = caffe2_pb2.CPU
device_prefix = "cpu"
self._generate_data(devices, device_type, device_prefix)
data_parallel_model.Parallelize_BMUF(
model,
input_builder_fun,
self._model_build_fun,
self._param_update_fun,
devices=devices,
cpu_device=cpu_device
)
data_parallel_model.RunInitNet(model)
# Check initial momentum params are zeros
self.assertEqual(
list(viewkeys(model._device_grouped_blobs)), ['fc_w', 'fc_b']
)
self.assertEqual(workspace.FetchBlob('{}_0/fc_b_v'.format(device_prefix)), 0)
np.testing.assert_equal(
workspace.FetchBlob('{}_0/fc_w_v'.format(device_prefix)),
np.zeros(16).astype(np.float32).reshape(1, 16)
)
# Run the algorithm for one iteration to have non-zero params.
data_parallel_model.RunNet(model, 1)
# Save iteration momentum and post local update params
v_b_ = workspace.FetchBlob('{}_0/fc_b_v'.format(device_prefix))
v_w_ = workspace.FetchBlob('{}_0/fc_w_v'.format(device_prefix))
workspace.RunNetOnce(model.net)
b_0_ = workspace.FetchBlob('{}_0/fc_b'.format(device_prefix))
w_0_ = workspace.FetchBlob('{}_0/fc_w'.format(device_prefix))
b_1_ = workspace.FetchBlob('{}_1/fc_b'.format(device_prefix))
w_1_ = workspace.FetchBlob('{}_1/fc_w'.format(device_prefix))
# Compute block gradients.
b_g_ = workspace.FetchBlob('{}_0/fc_b_g'.format(device_prefix))
w_g_ = workspace.FetchBlob('{}_0/fc_w_g'.format(device_prefix))
workspace.RunNetOnce(model._global_model_param_updates_net)
g_b = (b_0_ + b_1_) / 2 - b_g_
g_w = (w_0_ + w_1_) / 2 - w_g_
v_b = workspace.FetchBlob('{}_0/fc_b_v'.format(device_prefix))
v_w = workspace.FetchBlob('{}_0/fc_w_v'.format(device_prefix))
w_g = workspace.FetchBlob('{}_0/fc_w_g'.format(device_prefix))
b_g = workspace.FetchBlob('{}_0/fc_b_g'.format(device_prefix))
w_0 = workspace.FetchBlob('{}_0/fc_w'.format(device_prefix))
b_0 = workspace.FetchBlob('{}_0/fc_b'.format(device_prefix))
w_1 = workspace.FetchBlob('{}_1/fc_w'.format(device_prefix))
b_1 = workspace.FetchBlob('{}_1/fc_b'.format(device_prefix))
# Check momentum update step
np.testing.assert_equal(v_b, 0.5 * v_b_ + g_b)
np.testing.assert_equal(v_w, 0.5 * v_w_ + g_w)
np.testing.assert_equal(w_g, w_0)
np.testing.assert_equal(w_g, w_1)
np.testing.assert_equal(b_g, b_0)
np.testing.assert_equal(b_g, b_1)
# Check params update step
np.testing.assert_equal(w_0, w_g_ + v_w)
np.testing.assert_equal(b_0, b_g_ + v_b)
@unittest.skipIf(not workspace.has_gpu_support, "No gpu support.")
@unittest.skipIf(workspace.NumCudaDevices() < 2, "Need at least 2 GPUs.")
class SparseDataParallelModelTestWithSharedIndices(TestCase):
'''
Create and run the model. We try with both storing indices for gather
on CPU and on GPU
'''
def run_model(self, V, gpu_devices):
def input_builder_fun(model):
return None
def model_build_fun(model, loss_scale):
gpu_vecs_gathered = []
gpu_vecs = []
for num, vec in enumerate(self.vecs):
gpu_vec = model.param_init_net.CopyCPUToGPU(
vec, 'gpuvec_{}'.format(num),
)
if num != 2:
model.params.append(gpu_vec)
gpu_vecs.append(gpu_vec)
for num, gpu_vec in enumerate(gpu_vecs):
gpu_vec_gathered = model.net.Gather(
[gpu_vec, 'indices'],
['gpu_vec_gathered_{}'.format(num)]
)
gpu_vecs_gathered.append(gpu_vec_gathered)
assert len(gpu_vecs_gathered) == 3
fc = model.net.FC(
[
gpu_vecs_gathered[2],
gpu_vecs_gathered[0],
gpu_vecs_gathered[1],
],
['fc'],
)
_, loss = model.net.SoftmaxWithLoss(
[fc, 'label'],
['ce_loss', 'avg_loss'],
only_loss=True,
)
loss = model.Scale(loss, scale=loss_scale)
model.net.Print(loss, [], limit=10)
return [loss]
def param_update_fun(model):
ONE = model.param_init_net.ConstantFill(
[], "ONE", shape=[1], value=1.0,
)
LR = model.CopyCPUToGPU(self.LR, "LR")
for param in model.GetParams():
param_grad = model.param_to_grad[param]
if not isinstance(param_grad, core.GradientSlice):
model.WeightedSum([param, ONE, param_grad, LR], param)
else:
model.net.ScatterWeightedSum(
[
param,
ONE,
param_grad.indices,
param_grad.values,
ONE,
],
param,
)
workspace.ResetWorkspace()
model = cnn.CNNModelHelper(
order="NHWC",
name="sparse_test{}".format(gpu_devices),
)
batch_size = 32
batch_per_device = batch_size // len(gpu_devices)
with core.NameScope("cpu"):
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
self.ITER = model.Iter("ITER")
self.LR = model.net.LearningRate(
[self.ITER],
"LR",
base_lr=(-0.1),
policy="fixed",
)
'''
self.vecs consists of 3 big blobs on which we call Gather:
1) FC weights, shape=(V, 16)
2) FC bias, shape=(V)
3) FC input, shape=(batch_per_device, 16)
'''
self.vecs = [
model.param_init_net.UniformFill(
[], "vec_{}".format(num), shape=[V, 16])
for num in range(2)
]
self.vecs.append(
model.param_init_net.UniformFill(
[],
"vec_2", shape=[batch_per_device, 16]
)
)
self.ONE_CPU = model.param_init_net.ConstantFill(
[], "ONE_CPU", shape=[1], value=1.0,
)
data_parallel_model.Parallelize_GPU(
model,
input_builder_fun=input_builder_fun,
forward_pass_builder_fun=model_build_fun,
param_update_builder_fun=param_update_fun,
devices=gpu_devices,
)
# Update the vecs
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, 0)):
for num, vec in enumerate(self.vecs[:-1]):
model.CopyGPUToCPU("gpu_0/gpuvec_{}".format(num), vec)
# Each run has same input, independent of number of gpus
for i in range(0, 10):
np.random.seed(2603)
full_indices = np.random.permutation(V)[:batch_size].reshape(
batch_size
)
full_labels = full_indices[:] % batch_per_device
for (j, g) in enumerate(gpu_devices):
st = j * batch_per_device
en = st + batch_per_device
indices = full_indices[st:en].astype(np.int32)
labels = full_labels[st:en].astype(np.int32)
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, g)):
workspace.FeedBlob("gpu_{}/indices".format(g), indices)
workspace.FeedBlob("gpu_{}/label".format(g), labels)
if i == 0:
workspace.RunNetOnce(model.param_init_net)
# Force vecs to be same on all runs
orig_vecs = [
np.random.rand(V, 16).astype(np.float32),
np.random.rand(V).astype(np.float32),
np.random.rand(V, 16).astype(np.float32),
]
for vec, orig_vec in zip(self.vecs, orig_vecs):
workspace.FeedBlob(
vec,
orig_vec
)
for g in gpu_devices:
for num, orig_vec in enumerate(orig_vecs):
workspace.FeedBlob(
"gpu_{}/gpuvec_{}".format(g, num),
orig_vec,
device_option=core.DeviceOption(
caffe2_pb2.CUDA, g),
)
workspace.CreateNet(model.net)
workspace.RunNet(model.net.Proto().name)
idx = workspace.FetchBlob('gpu_0/indices')
grad_slices = [
workspace.FetchBlob(
'gpu_{}/gpu_vec_gathered_{}_grad'.format(g, num))
for g in gpu_devices for num in range(2)
]
for grad_slice in grad_slices:
# print (len(idx), len(grad_slice))
assert len(idx) == len(grad_slice), (
'Number of indices {} is not same as number of gradient '
'slices {}. This might lead to illegal memory access'.format(
len(idx), len(grad_slice)
)
)
def test_sparse_shared_indices_gpu(self):
'''
Test that the model has same number of indices and gradient rows
given total batchsize, independent of number of GPUs.
'''
V = 10000
self.run_model(V, [0, 1])
self.run_model(V, [0])
if workspace.NumCudaDevices() >= 4:
self.run_model(V, list(range(4)))
if workspace.NumCudaDevices() >= 8:
self.run_model(V, list(range(8)))
if __name__ == "__main__":
import unittest
unittest.main()