Files
pytorch/torch/csrc/autograd/python_engine.cpp
Wanchao Liang 618104185b [autograd] enable graph level thread parallelism on CPU (#33157)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/33157

This PR enables graph level thread parallelism on CPU for the Autograd
Engine. It replace https://github.com/pytorch/pytorch/pull/29574 for the
reason of task level parallelism drawbacks with the existing autograd
system.

Fixes https://github.com/pytorch/pytorch/issues/18333

The graph level parallelism on CPU design:

1. Remove the single CPU thread that init in the Engine itself and allow
   the owning thread (which calls Engine::execute) to drive the Engine
   execution so that we could let outer threading to enable thread
   parallelism.
2. Maintain a separate ReadyQueue per CPU thread, and stash the
   ReadyQueue for different devices/threads into the thread local
   shared_ptr, the Engine itself will memorize the shared_ptr of the
   ReadyQueue to different devices (other than CPU)
3. The CPU thread local ReadyQueue is initialized per CPU thread
   Engine::execute call (or `backward()`, `grad()` call), and memorized
   the shared_ptr into the GraphTask since every `backward()` call have
   its own GraphTask
4. Cross device NodeTask push is accomplished by 2 and 3. we can refer
   to device's ReadyQueue from Engine, and CPU's ReadyQueue from
   GraphTask, which means if we can push to a different ReadyQueue
   according to the device
5. Termination of the CPU thread: if we mark the graph_task as
   completed, we will exit the while loop and terminate the current
   backward execution, because it's guranteed that all other NodeTasks
   is finished before we mark a GraphTask as complete
6. re-entrant thread logic keeps the same, reentrant thread detection is
   similar as before, we set the worker_device to NO_DEVICE initially
   and set to CPU afterward to detect if this is a reentrant call or not.
7. we still have the reentrant thread pool that create new threads if it's
   a deep reentrant case, and reuse the ReadyQueue with the parent thread
   for performance.

Since we introduce the thread parallelism on CPU, we have to ensure the
thread safety of the GraphTask. This is not a problem if we execute all
forward in different threads since we will build separate GraphTask in
different threads, and each GraphTask is a separate instance that share
nothing, i.e. Hogwild training on CPU should be fine on this case.

But there might be case that user would like to do some part of the task in
a single thread, and do the rest of work in several threads
concurrently, so thread safety is crucial in those cases. The thread
safety strategy for the multithread autograd is as follows:

1. Add a mutex to protect thread safety in Autograd Node/Function, and
   hold the lock for different data racing cases
2. Lock the mutex during Node::apply(), this is to ensure Node that
   writing to the shared variable are not racing across threads (i.e.
   AccumulateGrad and custom C++ Autograd Node if writing to shared
   variables )
3. Lock the mutex during Node::release_variables(), this serve the
   purpose that when we release saved_variables from one thread, no
   other threads can call the Node::apply(), this ensures the variable
   references from other threads aren't dangling.
4. If we don't release any variables and no shared data read/write in
   the Node i.e. purely functional, we don't lock the mutex

This way we could protect the thread safety on Autograd Node, but we
could still not protect the thread safety on Node pre/post C++ hooks
(python hooks are automatically thread safe), we rely on the user to
write thread safe C++ hooks if they want the hook to be correctly
applied in multithreading environment.

**User visiable changes**:
There're not too much user visiable changes, since we use the owning
thread to drive the autograd execution, user could write their own
threading code and does not block on the Autograd engine, some behaviors
that user should be aware of:

**Non-determinism**:
if we are calling backward() on multiple thread concurrently but with
shared inputs (i.e. Hogwild CPU training). Since parameters are automatically shared across threads, gradient accumulation might become non-deterministic on backward calls across threads, because two backward calls might access and try to accumulate the same .grad attribute. This is technically not safe, and it might result in racing condition and the result might be invalid to use.

But this is expected pattern if user are using the multithreading
approach to drive the whole training process but using shared
parameters, user who use multithreading should have the threading model
in mind and should expect this to happen. User should use the functional
interface `torch.autograd.grad()` to calculate the gradients instead of
`backward()` on loss.

**Graph retaining**:
If part of the autograd graph is shared between threads, i.e. run first
part of forward single thread, then run second part in multiple threads,
then the first part of graph is shared. In this case different threads execute grad() or backward() on the same graph might
have issue of destroying the graph on the fly of one thread, and the
other thread will crash in this case. We will error out to the user
similar to what call `backward()` twice with out `retain_graph=True`, and let the user know they should use `retain_graph=True`.

**TODOs**:

[ ] benchmark the PR with example models and datasets to demonstrate
the performance gain in CPU training
[ ] ensure that we don't regress the single thread autograd performance

**Follow ups**:

[ ] a correct and tight integration with distributed autograd
[ ] try to unify the thread pool between JIT and Autograd, and see if
there's unifying pattern that we could apply universally

Test Plan: Imported from OSS

Differential Revision: D20236771

Pulled By: wanchaol

fbshipit-source-id: 1e0bd4eec14ffebeffdb60b763b8d6f0e427eb64
2020-03-26 17:17:52 -07:00

317 lines
12 KiB
C++

#include <torch/csrc/autograd/python_engine.h>
#include <torch/csrc/DynamicTypes.h>
#include <torch/csrc/PtrWrapper.h>
#include <torch/csrc/THP.h>
#include <torch/csrc/autograd/edge.h>
#include <torch/csrc/autograd/engine.h>
#include <torch/csrc/autograd/function.h>
#include <torch/csrc/autograd/python_anomaly_mode.h>
#include <torch/csrc/autograd/python_function.h>
#include <pybind11/pybind11.h>
#ifndef _WIN32
#include <pthread.h>
#endif
#include <unordered_set>
#include <memory> // for unique_ptr
using namespace torch::autograd;
struct THPEngine {
PyObject_HEAD
};
static bool _reinitialize_engine = false;
namespace torch { namespace autograd { namespace python {
PythonEngine::PythonEngine() = default;
Engine& PythonEngine::get_python_engine() {
static PythonEngine engine;
// This is "probably" thread-safe because the flag is set in a fork handler
// before any threads are created, and this function is only called with the
// GIL held. However, using fork + threads is playing with fire so this is
// more of a "best effort" thing. For example, if the fork occurs while the
// backwards threads hold a lock, we'll probably deadlock in the engine
// destructor.
if (_reinitialize_engine) {
engine.release_workers();
engine.~PythonEngine();
new (&engine) torch::autograd::python::PythonEngine();
_reinitialize_engine = false;
}
return engine;
}
void PythonEngine::thread_init(int device, const std::shared_ptr<ReadyQueue>& ready_queue) {
// Create a PyThreadState, but release the GIL. This lets pybind11::gil_scoped_acquire calls
// inside thread_main acquire the GIL without having to create a new
// PyThreadState each time.
pybind11::gil_scoped_acquire gil;
pybind11::gil_scoped_release no_gil;
Engine::thread_init(device, ready_queue);
}
void PythonEngine::thread_on_exception(
std::shared_ptr<GraphTask> graph_task,
const std::shared_ptr<Node>& fn,
std::exception& e) {
auto python_err = dynamic_cast<python_error*>(&e);
if (python_err) {
python_err->persist();
}
Engine::thread_on_exception(graph_task, fn, e);
}
std::unique_ptr<AnomalyMetadata> PythonEngine::make_anomaly_metadata() {
return std::unique_ptr<AnomalyMetadata>(new PyAnomalyMetadata());
}
variable_list PythonEngine::execute(
const edge_list& roots,
const variable_list& inputs,
bool keep_graph,
bool create_graph,
const edge_list& outputs) {
TORCH_CHECK(!PyGILState_Check(), "The autograd engine was called while holding the GIL. If you are using the C++ "
"API, the autograd engine is an expensive operation that does not require the "
"GIL to be held so you should release it with 'pybind11::gil_scoped_release no_gil;'"
". If you are not using the C++ API, please report a bug to the pytorch team.")
try {
return Engine::execute(roots, inputs, keep_graph, create_graph, outputs);
} catch (python_error& e) {
e.restore();
throw;
}
}
std::shared_ptr<FutureVariableList> PythonEngine::execute_with_graph_task(
const std::shared_ptr<GraphTask>& graph_task,
std::shared_ptr<Node> graph_root,
bool async_mode) {
try {
return Engine::execute_with_graph_task(graph_task, graph_root, async_mode);
} catch (python_error& e) {
pybind11::gil_scoped_acquire gil;
if (!PyErr_Occurred()) {
// Set the error indicator only if it is not set already.
e.restore();
}
throw;
}
}
}}} // namespace torch::autograd::python
PyObject *THPEngineClass = nullptr;
// Implementation of torch._C._EngineBase.run_backward
PyObject *THPEngine_run_backward(THPEngine *self, PyObject *args, PyObject *kwargs)
{
HANDLE_TH_ERRORS
PyObject *tensors = nullptr;
PyObject *grad_tensors = nullptr;
unsigned char keep_graph = 0;
unsigned char create_graph = 0;
PyObject *inputs = nullptr;
unsigned char allow_unreachable = 0;
const char *accepted_kwargs[] = {
"tensors", "grad_tensors", "keep_graph", "create_graph", "inputs",
"allow_unreachable", nullptr
};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OObb|Ob", (char**)accepted_kwargs,
&tensors, &grad_tensors, &keep_graph, &create_graph, &inputs, &allow_unreachable))
return nullptr;
THPUtils_assert(PyTuple_Check(tensors), "tensors argument is expected to "
"be a tuple, but got %s", THPUtils_typename(tensors));
THPUtils_assert(PyTuple_Check(grad_tensors), "grad_tensors argument is "
"expected to be a tuple, but got %s", THPUtils_typename(grad_tensors));
Py_ssize_t num_tensors = PyTuple_GET_SIZE(tensors);
Py_ssize_t num_gradients = PyTuple_GET_SIZE(grad_tensors);
THPUtils_assert(num_tensors == num_gradients, "got %ld tensors and %ld "
"gradients", num_tensors, num_gradients);
edge_list roots;
roots.reserve(num_tensors);
variable_list grads;
grads.reserve(num_tensors);
for (int i = 0; i < num_tensors; i++) {
PyObject *_tensor = PyTuple_GET_ITEM(tensors, i);
THPUtils_assert(THPVariable_Check(_tensor), "element %d of tensors "
"tuple is not a Tensor", i);
auto& variable = ((THPVariable*)_tensor)->cdata;
auto gradient_edge = torch::autograd::impl::gradient_edge(variable);
THPUtils_assert(gradient_edge.function,
"element %d of tensors does not require grad and does not have a grad_fn", i);
roots.push_back(std::move(gradient_edge));
PyObject *grad = PyTuple_GET_ITEM(grad_tensors, i);
if (THPVariable_Check(grad)) {
const Variable& grad_var = ((THPVariable*)grad)->cdata;
if (grad_var.has_names()) {
TORCH_WARN(
"Autograd was passed a named grad tensor with dims ", grad_var.names(),
". Autograd does not yet support named tensor semantics, so all names ",
"will be ignored. In practice all computed gradients will still be correct "
"according to regular tensor semantics.");
}
grads.push_back(grad_var);
} else {
THPUtils_assert(grad == Py_None,
"element %d of gradients tuple is not a Tensor or None", i);
THPUtils_assert(!variable.requires_grad(),
"element %d of gradients tuple is None, but the corresponding Tensor requires grad");
}
}
std::vector<Edge> output_edges;
if (inputs != nullptr) {
int num_inputs = PyTuple_GET_SIZE(inputs);
output_edges.reserve(num_inputs);
for (int i = 0; i < num_inputs; ++i) {
PyObject *input = PyTuple_GET_ITEM(inputs, i);
THPUtils_assert(THPVariable_Check(input),
"all inputs have to be Tensors, but got %s", THPUtils_typename(input));
THPVariable *input_var = (THPVariable*)input;
const auto output_nr = input_var->cdata.output_nr();
auto grad_fn = input_var->cdata.grad_fn();
if (!grad_fn) {
grad_fn = torch::autograd::impl::try_get_grad_accumulator(input_var->cdata);
}
THPUtils_assert(input_var->cdata.requires_grad(),
"One of the differentiated Tensors does not require grad");
if (!grad_fn) {
output_edges.emplace_back();
} else {
output_edges.emplace_back(grad_fn, output_nr);
}
}
}
variable_list outputs;
{
pybind11::gil_scoped_release no_gil;
auto& engine = python::PythonEngine::get_python_engine();
outputs = engine.execute(roots, grads, keep_graph, create_graph, output_edges);
}
if (inputs != nullptr) {
int num_inputs = PyTuple_GET_SIZE(inputs);
THPObjectPtr py_outputs {PyTuple_New(num_inputs)};
if (!py_outputs) return nullptr;
for (int i = 0; i < num_inputs; i++) {
THPUtils_assert(allow_unreachable || outputs[i].defined(), "One of the "
"differentiated Tensors appears to not have been used "
"in the graph. Set allow_unused=True if this is the "
"desired behavior.");
PyTuple_SET_ITEM(py_outputs.get(), i, THPVariable_Wrap(outputs[i]));
}
return py_outputs.release();
} else {
Py_RETURN_NONE;
}
END_HANDLE_TH_ERRORS
}
PyObject* THPEngine_queue_callback(PyObject *self, PyObject *_callback) {
HANDLE_TH_ERRORS
auto& engine = python::PythonEngine::get_python_engine();
std::shared_ptr<PyObject> callback(_callback, [](PyObject *obj) { pybind11::gil_scoped_acquire gil; Py_DECREF(obj); });
Py_INCREF(_callback);
engine.queue_callback([callback]() {
pybind11::gil_scoped_acquire gil;
THPObjectPtr result {PyObject_CallFunctionObjArgs(callback.get(), nullptr)};
if (!result) throw python_error();
});
Py_RETURN_NONE;
END_HANDLE_TH_ERRORS
}
PyObject* THPEngine_is_checkpoint_valid(PyObject *self, PyObject *noargs) {
HANDLE_TH_ERRORS
auto& engine = python::PythonEngine::get_python_engine();
if(engine.is_checkpoint_valid()) {
Py_RETURN_TRUE;
} else {
Py_RETURN_FALSE;
}
END_HANDLE_TH_ERRORS
}
PyObject *THPEngine_new(PyTypeObject *type, PyObject *args, PyObject *kwargs)
{
return type->tp_alloc(type, 0);
}
static struct PyMethodDef THPEngine_methods[] = {
{(char*)"run_backward", (PyCFunction)(void(*)(void))THPEngine_run_backward, METH_VARARGS | METH_KEYWORDS, nullptr},
{(char*)"queue_callback", (PyCFunction)THPEngine_queue_callback, METH_O, nullptr},
{(char*)"is_checkpoint_valid", (PyCFunction)THPEngine_is_checkpoint_valid, METH_NOARGS, nullptr},
{nullptr}
};
PyTypeObject THPEngineType = {
PyVarObject_HEAD_INIT(nullptr, 0)
"torch._C._EngineBase", /* tp_name */
sizeof(THPEngine), /* tp_basicsize */
0, /* tp_itemsize */
nullptr, /* tp_dealloc */
0, /* tp_vectorcall_offset */
nullptr, /* tp_getattr */
nullptr, /* tp_setattr */
nullptr, /* tp_reserved */
nullptr, /* tp_repr */
nullptr, /* tp_as_number */
nullptr, /* tp_as_sequence */
nullptr, /* tp_as_mapping */
nullptr, /* tp_hash */
nullptr, /* tp_call */
nullptr, /* tp_str */
nullptr, /* tp_getattro */
nullptr, /* tp_setattro */
nullptr, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
nullptr, /* tp_doc */
nullptr, /* tp_traverse */
nullptr, /* tp_clear */
nullptr, /* tp_richcompare */
0, /* tp_weaklistoffset */
nullptr, /* tp_iter */
nullptr, /* tp_iternext */
THPEngine_methods, /* tp_methods */
nullptr, /* tp_members */
nullptr, /* tp_getset */
nullptr, /* tp_base */
nullptr, /* tp_dict */
nullptr, /* tp_descr_get */
nullptr, /* tp_descr_set */
0, /* tp_dictoffset */
nullptr, /* tp_init */
nullptr, /* tp_alloc */
THPEngine_new /* tp_new */
};
static void child_atfork() {
_reinitialize_engine = true;
}
bool THPEngine_initModule(PyObject *module)
{
#ifndef _WIN32
if (pthread_atfork(nullptr, nullptr, child_atfork) != 0) {
throw std::runtime_error("unable to set pthread_atfork handler");
}
#endif
if (PyType_Ready(&THPEngineType) < 0)
return false;
Py_INCREF(&THPEngineType);
PyModule_AddObject(module, "_ImperativeEngine", (PyObject *)&THPEngineType);
set_default_engine_stub(python::PythonEngine::get_python_engine);
return true;
}