From 94e12f97dca4b6dcc3760bddb5a421f1441f1966 Mon Sep 17 00:00:00 2001 From: cyy Date: Thu, 10 Oct 2024 18:05:32 +0000 Subject: [PATCH] [Distributed] [16/N] Fix clang-tidy warnings in torch/csrc/distributed/c10d (#137404) Follows #137072 Pull Request resolved: https://github.com/pytorch/pytorch/pull/137404 Approved by: https://github.com/Skylion007 --- test/cpp/c10d/BackoffTest.cpp | 3 - test/cpp/c10d/FileStoreTest.cpp | 6 +- test/cpp/c10d/HashStoreTest.cpp | 10 +-- test/cpp/c10d/ProcessGroupGlooAsyncTest.cpp | 34 ++++----- test/cpp/c10d/ProcessGroupGlooTest.cpp | 76 +++++++++---------- test/cpp/c10d/ProcessGroupMPITest.cpp | 34 ++++----- test/cpp/c10d/ProcessGroupNCCLErrorsTest.cpp | 28 ++++--- test/cpp/c10d/ProcessGroupNCCLTest.cpp | 48 ++++++------ test/cpp/c10d/ProcessGroupUCCTest.cpp | 11 ++- test/cpp/c10d/TCPStoreTest.cpp | 54 ++++++------- torch/csrc/distributed/c10d/Backoff.cpp | 3 +- torch/csrc/distributed/c10d/Functional.cpp | 10 +-- torch/csrc/distributed/c10d/NCCLUtils.cpp | 72 ++++++++++-------- torch/csrc/distributed/c10d/NCCLUtils.hpp | 4 +- .../distributed/c10d/ProcessGroupNCCL.cpp | 75 +++++++++--------- .../distributed/c10d/ProcessGroupNCCL.hpp | 24 +++--- torch/csrc/distributed/c10d/TraceUtils.h | 21 +++-- torch/csrc/distributed/c10d/c10d.h | 8 +- torch/csrc/distributed/c10d/error.h | 6 +- torch/csrc/distributed/c10d/logging.h | 6 +- torch/csrc/distributed/c10d/socket.h | 6 +- torch/csrc/distributed/c10d/socket_fmt.h | 6 +- 22 files changed, 256 insertions(+), 289 deletions(-) diff --git a/test/cpp/c10d/BackoffTest.cpp b/test/cpp/c10d/BackoffTest.cpp index 054f30ba4993..b229ec5dbfef 100644 --- a/test/cpp/c10d/BackoffTest.cpp +++ b/test/cpp/c10d/BackoffTest.cpp @@ -1,9 +1,6 @@ #include #include "StoreTestCommon.hpp" -#include -#include - #include TEST(BackoffTest, exponentialBackoffDefaults) { diff --git a/test/cpp/c10d/FileStoreTest.cpp b/test/cpp/c10d/FileStoreTest.cpp index 29b4b370b011..68eb10aef6ca 100644 --- a/test/cpp/c10d/FileStoreTest.cpp +++ b/test/cpp/c10d/FileStoreTest.cpp @@ -40,7 +40,7 @@ std::string tmppath() { } #endif -void testGetSet(std::string path, std::string prefix = "") { +void testGetSet(const std::string& path, const std::string& prefix = "") { // Basic Set/Get on File Store { auto fileStore = c10::make_intrusive(path, 2); @@ -100,7 +100,7 @@ void stressTestStore(std::string path, std::string prefix = "") { c10d::test::Semaphore sem1, sem2; for (C10_UNUSED const auto i : c10::irange(numThreads)) { - threads.emplace_back(std::thread([&] { + threads.emplace_back([&] { auto fileStore = c10::make_intrusive(path, numThreads + 1); c10d::PrefixStore store(prefix, fileStore); @@ -109,7 +109,7 @@ void stressTestStore(std::string path, std::string prefix = "") { for (C10_UNUSED const auto j : c10::irange(numIterations)) { store.add("counter", 1); } - })); + }); } sem1.wait(numThreads); diff --git a/test/cpp/c10d/HashStoreTest.cpp b/test/cpp/c10d/HashStoreTest.cpp index f3478f6071b1..df4d53ef3afa 100644 --- a/test/cpp/c10d/HashStoreTest.cpp +++ b/test/cpp/c10d/HashStoreTest.cpp @@ -3,15 +3,15 @@ #include -#include #include #include #include +#include constexpr int64_t kShortStoreTimeoutMillis = 100; -void testGetSet(std::string prefix = "") { +void testGetSet(const std::string& prefix = "") { // Basic set/get { auto hashStore = c10::make_intrusive(); @@ -60,16 +60,16 @@ void stressTestStore(std::string prefix = "") { std::vector threads; c10d::test::Semaphore sem1, sem2; auto hashStore = c10::make_intrusive(); - c10d::PrefixStore store(prefix, hashStore); + c10d::PrefixStore store(std::move(prefix), hashStore); for (C10_UNUSED const auto i : c10::irange(numThreads)) { - threads.emplace_back(std::thread([&] { + threads.emplace_back([&] { sem1.post(); sem2.wait(); for (C10_UNUSED const auto j : c10::irange(numIterations)) { store.add("counter", 1); } - })); + }); } sem1.wait(numThreads); diff --git a/test/cpp/c10d/ProcessGroupGlooAsyncTest.cpp b/test/cpp/c10d/ProcessGroupGlooAsyncTest.cpp index d4c1d714d4ee..629a19628039 100644 --- a/test/cpp/c10d/ProcessGroupGlooAsyncTest.cpp +++ b/test/cpp/c10d/ProcessGroupGlooAsyncTest.cpp @@ -6,14 +6,13 @@ #include #include #include "CUDATest.hpp" -#include "TestUtils.hpp" using namespace c10d::test; using at::cuda::CUDAStream; template -std::vector initialize(const std::string& path, int N, Args&&... args) { +std::vector initialize(const std::string& path, size_t N, Args&&... args) { std::vector tests; for (C10_UNUSED const auto i : c10::irange(N)) { tests.push_back(std::move(T(path, std::forward(args)...))); @@ -35,10 +34,7 @@ class AsyncTest { public: AsyncTest(std::string path) : path_(std::move(path)) {} - AsyncTest(AsyncTest&& other) { - path_ = std::move(other.path_); - pg_ = std::move(other.pg_); - } + AsyncTest(AsyncTest&& other) noexcept = default; ::c10d::ProcessGroupGloo& getProcessGroup() { return *pg_; @@ -53,8 +49,8 @@ class AsyncTest { options->devices.push_back( ::c10d::ProcessGroupGloo::createDeviceForHostname("127.0.0.1")); - pg_ = std::unique_ptr<::c10d::ProcessGroupGloo>( - new ::c10d::ProcessGroupGloo(store, rank, size, options)); + pg_ = + std::make_unique<::c10d::ProcessGroupGloo>(store, rank, size, options); } protected: @@ -88,7 +84,7 @@ class AsyncInputIsOutputTest : public AsyncTest { at::cuda::OptionalCUDAGuard deviceGuard; streams_.reserve(numDevices_); for (const auto i : c10::irange(numDevices_)) { - deviceGuard.set_index(i); + deviceGuard.set_index(static_cast(i)); streams_.push_back(at::cuda::getStreamFromPool()); } } @@ -118,7 +114,9 @@ class AsyncInputIsOutputTest : public AsyncTest { } protected: + // NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members) const int numTensors_; + // NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members) const int numDevices_; std::vector inputs_; std::vector streams_; @@ -136,13 +134,13 @@ class AsyncAllreduceTest : public AsyncInputIsOutputTest { // Launch sleep on every stream at::cuda::OptionalCUDAGuard deviceGuard; for (const auto i : c10::irange(numDevices_)) { - deviceGuard.set_index(i); - cudaSleep(streams_[i], 10 * 1000 * 1000); + deviceGuard.set_index(static_cast(i)); + cudaSleep(streams_[i], 10ull * 1000 * 1000); } // Launch value initialization for every tensor for (const auto i : c10::irange(numTensors_)) { - deviceGuard.set_index(i % numDevices_); + deviceGuard.set_index(static_cast(i % numDevices_)); inputs_[i].fill_(pg_->getRank() * numTensors_ + i); } @@ -155,26 +153,26 @@ class AsyncBroadcastTest : public AsyncInputIsOutputTest { AsyncBroadcastTest(const std::string& path, int numTensors) : AsyncInputIsOutputTest(path, numTensors) {} - c10::intrusive_ptr run(int rootRank, int rootTensor) { + c10::intrusive_ptr run(size_t rootRank, size_t rootTensor) { // For the duration of this function, make THC use our streams c10::cuda::CUDAMultiStreamGuard guard(streams_); // Launch sleep on every stream at::cuda::OptionalCUDAGuard deviceGuard; for (const auto i : c10::irange(numDevices_)) { - deviceGuard.set_index(i); - cudaSleep(streams_[i], 10 * 1000 * 1000); + deviceGuard.set_index(static_cast(i)); + cudaSleep(streams_[i], 10ull * 1000 * 1000); } // Launch value initialization for every tensor for (const auto i : c10::irange(numTensors_)) { - deviceGuard.set_index(i % numDevices_); + deviceGuard.set_index(static_cast(i % numDevices_)); inputs_[i].fill_(pg_->getRank() * numTensors_ + i); } ::c10d::BroadcastOptions options; - options.rootRank = rootRank; - options.rootTensor = rootTensor; + options.rootRank = static_cast(rootRank); + options.rootTensor = static_cast(rootTensor); return pg_->broadcast(inputs_, options); } }; diff --git a/test/cpp/c10d/ProcessGroupGlooTest.cpp b/test/cpp/c10d/ProcessGroupGlooTest.cpp index a5c48bf31cfa..46720df11c4d 100644 --- a/test/cpp/c10d/ProcessGroupGlooTest.cpp +++ b/test/cpp/c10d/ProcessGroupGlooTest.cpp @@ -1,15 +1,12 @@ #ifndef _WIN32 -#include #include #include +#include #endif #include -#include -#include -#include -#include +#include #include #include @@ -30,7 +27,7 @@ constexpr auto kWaitTimeout = std::chrono::milliseconds(1); #ifndef _WIN32 class SignalTest { public: - SignalTest(const std::string& path) : path_(path) {} + SignalTest(std::string path) : path_(std::move(path)) {} ~SignalTest() { if (arm_.joinable()) { @@ -41,7 +38,7 @@ class SignalTest { // Arms test to send signal to PID when the semaphore unlocks. This // happens as soon as the first collective completes successfully. void arm(int pid, int signal) { - arm_ = std::thread([=] { + arm_ = std::thread([this, pid, signal] { sem_.wait(); kill(pid, signal); }); @@ -108,7 +105,7 @@ class ProcessGroupGlooDelayed : public ::c10d::ProcessGroupGloo { int rank, int size, c10::intrusive_ptr options) - : ProcessGroupGloo(store, rank, size, options) {} + : ProcessGroupGloo(store, rank, size, std::move(options)) {} c10::intrusive_ptr<::c10d::Work> send( std::vector& tensors, @@ -127,13 +124,13 @@ class CollectiveTest { bool delayed = false) { std::vector tests; for (C10_UNUSED const auto i : c10::irange(num)) { - tests.emplace_back(CollectiveTest(path)); + tests.emplace_back(path); } std::vector threads; for (const auto i : c10::irange(num)) { - threads.emplace_back(std::thread( - [i, &tests, delayed] { tests[i].start(i, tests.size(), delayed); })); + threads.emplace_back( + [i, &tests, delayed] { tests[i].start(i, tests.size(), delayed); }); } for (auto& thread : threads) { thread.join(); @@ -144,16 +141,13 @@ class CollectiveTest { CollectiveTest(std::string path) : path_(std::move(path)) {} - CollectiveTest(CollectiveTest&& other) { - path_ = std::move(other.path_); - pg_ = std::move(other.pg_); - } + CollectiveTest(CollectiveTest&& other) noexcept = default; ::c10d::ProcessGroupGloo& getProcessGroup() { return *pg_; } - void start(int rank, int size, bool delayed) { + void start(int rank, size_t size, bool delayed) { auto store = c10::make_intrusive<::c10d::FileStore>(path_, size); // Set a timeout that is small enough to make this test run fast, but also @@ -164,11 +158,11 @@ class CollectiveTest { ::c10d::ProcessGroupGloo::createDeviceForHostname("127.0.0.1")); if (!delayed) { - pg_ = std::unique_ptr<::c10d::ProcessGroupGloo>( - new ::c10d::ProcessGroupGloo(store, rank, size, options)); + pg_ = std::make_unique<::c10d::ProcessGroupGloo>( + store, rank, size, options); } else { - pg_ = std::unique_ptr( - new ProcessGroupGlooDelayed(store, rank, size, options)); + pg_ = + std::make_unique(store, rank, size, options); } } @@ -192,13 +186,13 @@ std::vector> copyTensors( } std::vector> waitWork( - std::vector> works) { + const std::vector>& works) { std::vector> outputTensors; for (auto& work : works) { try { work->wait(); } catch (const std::exception& ex) { - LOG(ERROR) << "Exception received: " << ex.what() << std::endl; + LOG(ERROR) << "Exception received: " << ex.what() << '\n'; } outputTensors.emplace_back(work->result()); } @@ -206,14 +200,14 @@ std::vector> waitWork( } std::vector> waitFuture( - std::vector> works) { + const std::vector>& works) { std::vector> outputTensors; for (auto& work : works) { auto fut = work->getFuture(); try { fut->wait(); } catch (const std::exception& ex) { - LOG(ERROR) << "Exception received: " << ex.what() << std::endl; + LOG(ERROR) << "Exception received: " << ex.what() << '\n'; } auto result = fut->value(); if (result.isNone()) { @@ -288,8 +282,7 @@ void testAllreduce( auto outputs = waitFuture(work); auto event_lists = disableProfilerLegacy(); - checkProfiledEvents( - std::move(event_lists), GLOO_ALLREDUCE_STR, size, allShapes); + checkProfiledEvents(event_lists, GLOO_ALLREDUCE_STR, size, allShapes); // Verify outputs const auto expected = (size * (size - 1)) / 2; @@ -334,8 +327,7 @@ void testAllreduceUsingWorkAPI( auto outputs = waitWork(work); auto event_lists = disableProfilerLegacy(); - checkProfiledEvents( - std::move(event_lists), GLOO_ALLREDUCE_STR, size, allShapes); + checkProfiledEvents(event_lists, GLOO_ALLREDUCE_STR, size, allShapes); // Verify outputs const auto expected = (size * (size - 1)) / 2; @@ -371,7 +363,8 @@ void testBroadcast( at::OptionalDeviceGuard deviceGuard; for (const auto l : c10::irange(stride)) { if (b == at::DeviceType::CUDA) { - deviceGuard.reset_device(at::Device(at::kCUDA, l)); + deviceGuard.reset_device( + at::Device(at::kCUDA, static_cast(l))); } inputs[k][l] = at::ones(shapes, at::dtype(dtype).device(b)) * (k * stride + l); @@ -396,8 +389,7 @@ void testBroadcast( auto outputs = waitFuture(work); auto event_lists = disableProfilerLegacy(); - checkProfiledEvents( - std::move(event_lists), GLOO_BROADCAST_STR, size, allShapes); + checkProfiledEvents(event_lists, GLOO_BROADCAST_STR, size, allShapes); // Verify outputs const auto expected = (i * stride + j); @@ -427,8 +419,9 @@ void testAlltoall(const std::string& path, const at::DeviceType b) { {30, 31, 32, 33, 34, 35, 36}, }; for (const auto rank : c10::irange(size)) { - const std::vector& blob = blobs[rank]; - inputs[rank] = at::from_blob((int32_t*)(blob.data()), blob.size()).to(b); + std::vector& blob = blobs[rank]; + inputs[rank] = + at::from_blob(blob.data(), static_cast(blob.size())).to(b); } // Allocate outputs @@ -478,7 +471,7 @@ void testAlltoall(const std::string& path, const at::DeviceType b) { } auto event_lists = disableProfilerLegacy(); - checkProfiledEvents(std::move(event_lists), GLOO_A2A_STR, size, allShapes); + checkProfiledEvents(event_lists, GLOO_A2A_STR, size, allShapes); // Verify outputs std::vector> expected = { {0, 1, 10, 11, 12, 20, 21, 30, 31}, @@ -516,7 +509,7 @@ void testBarrier(const std::string& path) { std::vector> allShapes; // Barrier does not use tensors, so skip shape checking. checkProfiledEvents( - std::move(event_lists), + event_lists, GLOO_STR, size, allShapes, @@ -533,7 +526,7 @@ void testMonitoredBarrier(const std::string& path) { std::vector threads; threads.reserve(size); for (const auto r : c10::irange(size)) { - threads.emplace_back(std::thread([=]() { runMonitoredBarrier(r); })); + threads.emplace_back([=]() { runMonitoredBarrier(r); }); } for (auto& t : threads) { t.join(); @@ -555,8 +548,7 @@ void testMonitoredBarrier(const std::string& path) { }; threads.clear(); for (const auto r : c10::irange(size)) { - threads.emplace_back( - std::thread([=]() { runMonitoredBarrierWithException(r); })); + threads.emplace_back([=]() { runMonitoredBarrierWithException(r); }); } for (auto& t : threads) { t.join(); @@ -613,14 +605,14 @@ void testSend(const std::string& path) { enableProfilerLegacy(ProfilerConfig( ProfilerState::CPU, /* report_input_shapes */ true, false)); auto sendWork = pg.send(tensors, dstRank, tag); - bool sendCompleted; + bool sendCompleted = false; std::thread waitSendThreadAbort([&]() { sendCompleted = sendWork->wait(); }); sendWork->abort(); // Block until the sendWork gets successfully aborted waitSendThreadAbort.join(); EXPECT_FALSE(sendCompleted); auto event_lists = disableProfilerLegacy(); - checkProfiledEvents(std::move(event_lists), GLOO_SEND_STR, 1, allShapes); + checkProfiledEvents(event_lists, GLOO_SEND_STR, 1, allShapes); // Now create a separate sender thread to ensure that future waitsends can // complete successfully. @@ -663,14 +655,14 @@ void testRecv(const std::string& path) { enableProfilerLegacy(ProfilerConfig( ProfilerState::CPU, /* report_input_shapes */ true, false)); auto recvWork = pg.recv(tensors, srcRank, tag); - bool recvCompleted; + bool recvCompleted = false; std::thread waitRecvThreadAbort([&]() { recvCompleted = recvWork->wait(); }); recvWork->abort(); // Block until the first recv gets successfully aborted waitRecvThreadAbort.join(); EXPECT_FALSE(recvCompleted); auto event_lists = disableProfilerLegacy(); - checkProfiledEvents(std::move(event_lists), GLOO_RECV_STR, 1, allShapes); + checkProfiledEvents(event_lists, GLOO_RECV_STR, 1, allShapes); // Now create a separate receiver thread to ensure that future waits can // complete successfully. diff --git a/test/cpp/c10d/ProcessGroupMPITest.cpp b/test/cpp/c10d/ProcessGroupMPITest.cpp index d9fcacc83d2f..1112ab723bd5 100644 --- a/test/cpp/c10d/ProcessGroupMPITest.cpp +++ b/test/cpp/c10d/ProcessGroupMPITest.cpp @@ -5,23 +5,21 @@ #include #include -#include #include -#include #define STR_HELPER(x) #x #define STR(x) STR_HELPER(x) // Wait for work to complete std::vector> waitWork( - c10::intrusive_ptr<::c10d::ProcessGroupMPI> pg, - std::vector> works) { + const c10::intrusive_ptr<::c10d::ProcessGroupMPI>& pg, + const std::vector>& works) { std::vector> outputTensors; for (auto& work : works) { try { work->wait(); } catch (const std::exception& ex) { - std::cerr << "Exception received: " << ex.what() << std::endl; + std::cerr << "Exception received: " << ex.what() << '\n'; pg->abort(); } outputTensors.emplace_back(work->result()); @@ -31,15 +29,15 @@ std::vector> waitWork( // Wait using Futures std::vector> waitFuture( - c10::intrusive_ptr<::c10d::ProcessGroupMPI> pg, - std::vector> works) { + const c10::intrusive_ptr<::c10d::ProcessGroupMPI>& pg, + const std::vector>& works) { std::vector> outputTensors; for (auto& work : works) { auto fut = work->getFuture(); try { fut->wait(); } catch (const std::exception& ex) { - std::cerr << "Exception received: " << ex.what() << std::endl; + std::cerr << "Exception received: " << ex.what() << '\n'; pg->abort(); } auto result = fut->value(); @@ -78,7 +76,7 @@ void testAllreduce(int iter = 1000) { const auto expected = worldSize * i; auto data = outputTensors[i][0].data_ptr(); for (auto j = 0; j < outputTensors[i][0].numel(); ++j) { - if (data[j] != expected) { + if (data[j] != static_cast(expected)) { TORCH_CHECK(false, "BOOM!"); } } @@ -110,7 +108,7 @@ void testBroadcast(int iter = 10000) { const auto expected = i; auto data = outputTensors[i][0].data_ptr(); for (auto j = 0; j < outputTensors[i][0].numel(); ++j) { - if (data[j] != expected) { + if (data[j] != static_cast(expected)) { TORCH_CHECK(false, "BOOM!"); } } @@ -140,7 +138,7 @@ void testReduce(int iter = 10000) { const auto expected = worldSize * i; auto data = outputTensors[i][0].data_ptr(); for (auto j = 0; j < outputTensors[i][0].numel(); ++j) { - if (data[j] != expected) { + if (data[j] != static_cast(expected)) { TORCH_CHECK(false, "BOOM!"); } } @@ -179,7 +177,7 @@ void testAllgather(int iter = 10000) { const auto expected = i * j; auto data = outputTensors[i][j].data_ptr(); for (auto k = 0; k < outputTensors[i][j].numel(); ++k) { - if (data[k] != expected) { + if (data[k] != static_cast(expected)) { TORCH_CHECK(false, "BOOM!"); } } @@ -222,7 +220,7 @@ void testGather(int iter = 10000) { const auto expected = i * j; auto data = outputTensors[i][j].data_ptr(); for (auto k = 0; k < outputTensors[i][j].numel(); ++k) { - if (data[k] != expected) { + if (data[k] != static_cast(expected)) { TORCH_CHECK(false, "BOOM!"); } } @@ -230,7 +228,7 @@ void testGather(int iter = 10000) { } } else { for (const auto i : c10::irange(iter)) { - if (outputTensors[i].size() != 0) { + if (!outputTensors[i].empty()) { TORCH_CHECK(false, "BOOM!"); } } @@ -271,7 +269,7 @@ void testScatter(int iter = 1) { const auto expected = i * j; auto data = outputTensors[i][0].data_ptr(); for (auto k = 0; k < outputTensors[i][0].numel(); ++k) { - if (data[k] != expected) { + if (data[k] != static_cast(expected)) { TORCH_CHECK(false, "BOOM!"); } } @@ -331,7 +329,7 @@ void testSendRecv(bool recvAnysource, int iter = 10000) { const auto expected = i; auto data = outputTensors[i][0].data_ptr(); for (auto j = 0; j < outputTensors[i][0].numel(); ++j) { - if (data[j] != expected) { + if (data[j] != static_cast(expected)) { TORCH_CHECK(false, "BOOM!"); } } @@ -349,7 +347,7 @@ int main(int argc, char** argv) { #ifdef MPIEXEC // If we are within an openmpi mpirun, then skip the exec if (!std::getenv("OMPI_COMM_WORLD_SIZE")) { - std::cout << "Execute mpiexec from: " << STR(MPIEXEC) << std::endl; + std::cout << "Execute mpiexec from: " << STR(MPIEXEC) << '\n'; execl(STR(MPIEXEC), "-np 2", argv[0], (char*)nullptr); } @@ -363,7 +361,7 @@ int main(int argc, char** argv) { testSendRecv(true); testBackendName(); - std::cout << "Test successful" << std::endl; + std::cout << "Test successful" << '\n'; #else std::cout << "MPI executable not found, skipping test" << std::endl; #endif diff --git a/test/cpp/c10d/ProcessGroupNCCLErrorsTest.cpp b/test/cpp/c10d/ProcessGroupNCCLErrorsTest.cpp index 7bea34273a44..d3650954f62e 100644 --- a/test/cpp/c10d/ProcessGroupNCCLErrorsTest.cpp +++ b/test/cpp/c10d/ProcessGroupNCCLErrorsTest.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include "CUDATest.hpp" #include "TestUtils.hpp" @@ -47,7 +48,7 @@ class ProcessGroupNCCLSimulateErrors : public c10d::ProcessGroupNCCL { int rank, int size, c10::intrusive_ptr opts) - : ProcessGroupNCCL(store, rank, size, opts), simulateError_(false) {} + : ProcessGroupNCCL(store, rank, size, std::move(opts)) {} std::exception_ptr checkForNCCLErrors( std::shared_ptr& ncclComm) override { @@ -93,7 +94,7 @@ class ProcessGroupNCCLSimulateErrors : public c10d::ProcessGroupNCCL { } private: - bool simulateError_; + bool simulateError_{false}; }; class WorkNCCLTimedoutErrors : public c10d::ProcessGroupNCCL::WorkNCCL { @@ -127,9 +128,7 @@ class ProcessGroupNCCLTimedOutErrors : public ProcessGroupNCCLSimulateErrors { int rank, int size, c10::intrusive_ptr opts) - : ProcessGroupNCCLSimulateErrors(store, rank, size, opts), - watchDogDebugInfoFinished_(false), - setTimedoutError_(false) {} + : ProcessGroupNCCLSimulateErrors(store, rank, size, std::move(opts)) {} c10::intrusive_ptr initWork( at::Device& device, @@ -177,10 +176,10 @@ class ProcessGroupNCCLTimedOutErrors : public ProcessGroupNCCLSimulateErrors { watchDogDebugInfoFinished_ = true; return ""; } - bool watchDogDebugInfoFinished_; + bool watchDogDebugInfoFinished_{false}; private: - bool setTimedoutError_; + bool setTimedoutError_{false}; }; class ProcessGroupNCCLNoHeartbeatCaught @@ -191,8 +190,7 @@ class ProcessGroupNCCLNoHeartbeatCaught int rank, int size, c10::intrusive_ptr opts) - : ProcessGroupNCCLTimedOutErrors(store, rank, size, opts), - hasMonitorThreadCaughtError_(false) {} + : ProcessGroupNCCLTimedOutErrors(store, rank, size, std::move(opts)) {} std::mutex& getWatchdogMutex() { return workMetaListMutex_; @@ -223,11 +221,11 @@ class ProcessGroupNCCLNoHeartbeatCaught // It's really hard to unit test std::abort. So we override it instead. // Commented this override, we do see process aborted with core dump without // this override. - void terminateProcess(std::string errMsg) override { + void terminateProcess(const std::string& errMsg) override { throw std::runtime_error(errMsg); } - bool hasMonitorThreadCaughtError_; + bool hasMonitorThreadCaughtError_{false}; }; class ProcessGroupNCCLDebugInfoStuck @@ -238,7 +236,7 @@ class ProcessGroupNCCLDebugInfoStuck int rank, int size, c10::intrusive_ptr opts) - : ProcessGroupNCCLNoHeartbeatCaught(store, rank, size, opts) {} + : ProcessGroupNCCLNoHeartbeatCaught(store, rank, size, std::move(opts)) {} protected: // Override the heartbeat monitor function to set a long timeout to mimic the @@ -357,7 +355,7 @@ std::string readTraceFromFile(const std::string& filename, size_t size) { // Read the strings from the file if (file) { // While the file stream is in good state std::string str(size, '\0'); - file.read(&str[0], size); + file.read(&str[0], static_cast(size)); if (file) { return str; } @@ -368,7 +366,7 @@ std::string readTraceFromFile(const std::string& filename, size_t size) { // Extend the nested class outside the parent class class TestDebugInfoWriter : public c10d::DebugInfoWriter { public: - TestDebugInfoWriter(std::string namePrefix) + TestDebugInfoWriter(const std::string& namePrefix) : DebugInfoWriter(namePrefix, 0) {} void write(const std::string& ncclTrace) override { @@ -433,7 +431,7 @@ TEST_F(ProcessGroupNCCLErrorsTest, testNCCLErrorsNoHeartbeat) { EXPECT_TRUE(pg.getErrorCaughtFlag()); } work->wait(); - EXPECT_TRUE(traces.size() > 0); + EXPECT_TRUE(!traces.empty()); auto filename = c10::str(tempFilename, 0); auto traceFromStorage = readTraceFromFile(filename, traces.size()); // Check the traces read from storage match with the original nccl trace. diff --git a/test/cpp/c10d/ProcessGroupNCCLTest.cpp b/test/cpp/c10d/ProcessGroupNCCLTest.cpp index 7a03db5cf67e..fa586e74825f 100644 --- a/test/cpp/c10d/ProcessGroupNCCLTest.cpp +++ b/test/cpp/c10d/ProcessGroupNCCLTest.cpp @@ -21,15 +21,12 @@ using at::cuda::CUDAStream; class NCCLTestBase { public: NCCLTestBase( - const std::string& path, + std::string path, const std::chrono::milliseconds pgTimeout = c10d::kProcessGroupNCCLDefaultTimeout) - : path_(path), pgTimeout_(pgTimeout) {} + : path_(std::move(path)), pgTimeout_(pgTimeout) {} - NCCLTestBase(NCCLTestBase&& other) { - path_ = std::move(other.path_); - pg_ = std::move(other.pg_); - } + NCCLTestBase(NCCLTestBase&& other) noexcept = default; std::shared_ptr<::c10d::ProcessGroupNCCL> getProcessGroup() { return pg_; @@ -41,7 +38,7 @@ class NCCLTestBase { void initialize( int rank, - int size, + size_t size, std::optional<::std::shared_ptr<::c10d::ProcessGroupNCCL>> split_from = std::nullopt) { store_ = c10::make_intrusive<::c10d::FileStore>(path_, size); @@ -55,8 +52,8 @@ class NCCLTestBase { opts->split_color = ++color_; } #endif - pg_ = std::unique_ptr<::c10d::ProcessGroupNCCL>( - new ::c10d::ProcessGroupNCCL(store_, rank, size, std::move(opts))); + pg_ = std::make_unique<::c10d::ProcessGroupNCCL>( + store_, rank, size, std::move(opts)); } protected: @@ -76,10 +73,7 @@ class NCCLTest : public NCCLTestBase { std::chrono::milliseconds pgTimeout = c10d::kProcessGroupNCCLDefaultTimeout, int inputDim = 3) - : NCCLTestBase(path, pgTimeout), - numDevices_(1), // one device per rank (thread) - rank_(rank), - worldSize_(worldSize) { + : NCCLTestBase(path, pgTimeout), rank_(rank), worldSize_(worldSize) { // Each device has a single tensor to perf the NCCL op ::at::globalContext().lazyInitDevice(c10::DeviceType::CUDA); tensors_.resize(numDevices_); @@ -88,10 +82,10 @@ class NCCLTest : public NCCLTestBase { at::cuda::OptionalCUDAGuard deviceGuard; assert(numDevices_ == 1); for (const auto i : c10::irange(numDevices_)) { - deviceGuard.set_index(rank_); + deviceGuard.set_index(static_cast(rank_)); tensors_[i] = at::empty({inputDim, inputDim}, at::kCUDA); - inputs_[i].resize(worldSize_ * numDevices_); - outputs_[i].resize(worldSize_ * numDevices_); + inputs_[i].resize(static_cast(worldSize_) * numDevices_); + outputs_[i].resize(static_cast(worldSize_) * numDevices_); for (auto j = 0; j < worldSize_ * numDevices_; ++j) { inputs_[i][j] = at::empty({inputDim, inputDim}, at::kCUDA); outputs_[i][j] = at::empty({inputDim, inputDim}, at::kCUDA); @@ -106,7 +100,7 @@ class NCCLTest : public NCCLTestBase { // getters to retrieve the current stream). // // 1 device only, hence 1 stream only - deviceGuard.set_index(rank_); + deviceGuard.set_index(static_cast(rank_)); streams_.push_back(at::cuda::getStreamFromPool()); } @@ -148,7 +142,8 @@ class NCCLTest : public NCCLTestBase { std::vector>& tensor_lists) { std::vector> outputs(numDevices_); for (auto& output : outputs) { - output = std::vector(worldSize_ * numDevices_); + output = std::vector( + static_cast(worldSize_ * numDevices_)); } // For the duration of this function, make THC use our streams @@ -169,8 +164,8 @@ class NCCLTest : public NCCLTestBase { void launchDeviceSleep() { at::cuda::OptionalCUDAGuard deviceGuard; for (const auto i : c10::irange(numDevices_)) { - deviceGuard.set_index(rank_); - cudaSleep(streams_[i], 2000 * 1000 * 1000); + deviceGuard.set_index(static_cast(rank_)); + cudaSleep(streams_[i], 2000ull * 1000 * 1000); } } @@ -178,7 +173,7 @@ class NCCLTest : public NCCLTestBase { void valueInitialization() { at::cuda::OptionalCUDAGuard deviceGuard; for (const auto i : c10::irange(numDevices_)) { - deviceGuard.set_index(rank_); + deviceGuard.set_index(static_cast(rank_)); tensors_[i].fill_(pg_->getRank() * numDevices_ + i); } } @@ -199,14 +194,15 @@ class NCCLTest : public NCCLTestBase { void valueInitializationForSparse() { at::cuda::OptionalCUDAGuard deviceGuard; for (const auto i : c10::irange(numDevices_)) { - deviceGuard.set_index(rank_); + deviceGuard.set_index(static_cast(rank_)); tensors_[i].fill_(pg_->getRank() * numDevices_ + i + 1); // Convert the dense tensor to a sparse tensor in COO row format tensors_[i] = to_sparse_row_indices_format(tensors_[i]); } } - const int numDevices_; + // NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members) + const int numDevices_{1}; // one device per rank (thread) int rank_; int worldSize_; std::vector tensors_; @@ -374,7 +370,7 @@ class ReduceScatterBaseNCCLTest : public NCCLTest { ReduceScatterBaseNCCLTest(const std::string& path, int rank, int worldSize) : NCCLTest(path, rank, worldSize) { at::cuda::OptionalCUDAGuard deviceGuard; - deviceGuard.set_index(rank_); + deviceGuard.set_index(static_cast(rank_)); output_tensor_ = at::empty({1}, at::kCUDA); input_tensor_ = at::empty({worldSize}, at::kCUDA); for (const auto i : c10::irange(worldSize)) { @@ -755,7 +751,7 @@ class ProcessGroupNCCLTest : public ::testing::Test { std::vector threads; threads.reserve(size_); for (const auto rank : c10::irange(size_)) { - threads.emplace_back(std::thread(testFunc, file.path, rank, size_)); + threads.emplace_back(testFunc, file.path, rank, size_); } for (const auto rank : c10::irange(size_)) { threads[rank].join(); @@ -827,7 +823,7 @@ TEST_F(ProcessGroupNCCLTest, testBackendName) { } TemporaryFile file; auto test = NCCLTestBase(file.path); - test.initialize(/*rank=*/0, /*world_size=*/1); + test.initialize(/*rank=*/0, /*size=*/1); EXPECT_EQ( test.getProcessGroup()->getBackendName(), std::string(c10d::NCCL_BACKEND_NAME)); diff --git a/test/cpp/c10d/ProcessGroupUCCTest.cpp b/test/cpp/c10d/ProcessGroupUCCTest.cpp index a31e990536e1..84affb59cc2d 100644 --- a/test/cpp/c10d/ProcessGroupUCCTest.cpp +++ b/test/cpp/c10d/ProcessGroupUCCTest.cpp @@ -1,11 +1,9 @@ +#ifdef USE_C10D_UCC #include #include #include #include - -using namespace c10d; - TEST(ProcessGroupUCCTest, testTrim) { std::vector> tests = { {" allreduce ", "allreduce"}, @@ -13,7 +11,7 @@ TEST(ProcessGroupUCCTest, testTrim) { {"send\n", "send"}, }; for (auto entry : tests) { - ASSERT_EQ(trim(entry.first), entry.second); + ASSERT_EQ(c10d::trim(entry.first), entry.second); } } @@ -24,12 +22,13 @@ TEST(ProcessGroupUCCTest, testToLower) { {"send", "send"}, }; for (auto entry : tests) { - ASSERT_EQ(tolower(entry.first), entry.second); + ASSERT_EQ(c10d::tolower(entry.first), entry.second); } } TEST(ProcessGroupUCCTest, testParseList) { std::string input = "\tAllReduce, ALLGATHER, send\n"; std::vector expect{"allreduce", "allgather", "send"}; - ASSERT_EQ(parse_list(input), expect); + ASSERT_EQ(c10d::parse_list(input), expect); } +#endif diff --git a/test/cpp/c10d/TCPStoreTest.cpp b/test/cpp/c10d/TCPStoreTest.cpp index 7351984f36c9..d68b547240f8 100644 --- a/test/cpp/c10d/TCPStoreTest.cpp +++ b/test/cpp/c10d/TCPStoreTest.cpp @@ -2,10 +2,7 @@ #include "StoreTestCommon.hpp" #include -#include -#include #include -#include #include #include @@ -104,33 +101,32 @@ void testHelper(bool useLibUV, const std::string& prefix = "") { std::to_string(numThreads * numIterations + 1); for (const auto i : c10::irange(numThreads)) { - threads.emplace_back( - std::thread([=, &sem1, &sem2, &clientStores, &expectedCounterRes] { - for (C10_UNUSED const auto j : c10::irange(numIterations)) { - clientStores[i]->add("counter", 1); - } - // Let each thread set and get key on its client store - std::string key = "thread_" + std::to_string(i); - for (const auto j : c10::irange(numIterations)) { - std::string val = "thread_val_" + std::to_string(j); - c10d::test::set(*clientStores[i], key, val); - c10d::test::check(*clientStores[i], key, val); - } + threads.emplace_back([=, &sem1, &sem2, &clientStores, &expectedCounterRes] { + for (C10_UNUSED const auto j : c10::irange(numIterations)) { + clientStores[i]->add("counter", 1); + } + // Let each thread set and get key on its client store + std::string key = "thread_" + std::to_string(i); + for (const auto j : c10::irange(numIterations)) { + std::string val = "thread_val_" + std::to_string(j); + c10d::test::set(*clientStores[i], key, val); + c10d::test::check(*clientStores[i], key, val); + } - sem1.post(); - sem2.wait(); - // Check the counter results - c10d::test::check(*clientStores[i], "counter", expectedCounterRes); - // Now check other threads' written data - for (const auto j : c10::irange(numThreads)) { - if (j == i) { - continue; - } - std::string key = "thread_" + std::to_string(i); - std::string val = "thread_val_" + std::to_string(numIterations - 1); - c10d::test::check(*clientStores[i], key, val); - } - })); + sem1.post(); + sem2.wait(); + // Check the counter results + c10d::test::check(*clientStores[i], "counter", expectedCounterRes); + // Now check other threads' written data + for (const auto j : c10::irange(numThreads)) { + if (j == i) { + continue; + } + std::string key = "thread_" + std::to_string(i); + std::string val = "thread_val_" + std::to_string(numIterations - 1); + c10d::test::check(*clientStores[i], key, val); + } + }); } sem1.wait(numThreads); diff --git a/torch/csrc/distributed/c10d/Backoff.cpp b/torch/csrc/distributed/c10d/Backoff.cpp index a0ef2ba0b8b3..6aadc33cbc5e 100644 --- a/torch/csrc/distributed/c10d/Backoff.cpp +++ b/torch/csrc/distributed/c10d/Backoff.cpp @@ -1,13 +1,12 @@ #include -#include #include namespace c10d { namespace { constexpr std::chrono::milliseconds kZeroInterval{0}; -int32_t randSeed() { +std::random_device::result_type randSeed() { std::random_device rd; return rd(); } diff --git a/torch/csrc/distributed/c10d/Functional.cpp b/torch/csrc/distributed/c10d/Functional.cpp index 5c62849f841e..1117718ee509 100644 --- a/torch/csrc/distributed/c10d/Functional.cpp +++ b/torch/csrc/distributed/c10d/Functional.cpp @@ -418,7 +418,7 @@ class AllToAllSingle : public torch::autograd::Function { static torch::autograd::variable_list backward( torch::autograd::AutogradContext* ctx, - torch::autograd::variable_list grad_out_list) { + const torch::autograd::variable_list& grad_out_list) { const std::vector& output_split_sizes = ctx->saved_data["output_split_sizes"].toIntVector(); const std::vector& input_split_sizes = @@ -476,12 +476,12 @@ class ReduceScatterTensor static torch::autograd::variable_list backward( torch::autograd::AutogradContext* ctx, - torch::autograd::variable_list grad_out_list) { + const torch::autograd::variable_list& grad_out_list) { const int64_t group_size = ctx->saved_data["group_size"].toInt(); const std::string& group_name = ctx->saved_data["group_name"].toStringRef(); DCHECK(grad_out_list.size() == 1); - auto grad_out = grad_out_list[0]; + const auto& grad_out = grad_out_list[0]; auto out = c10::Dispatcher::singleton() @@ -532,12 +532,12 @@ class AllGatherIntoTensor static torch::autograd::variable_list backward( torch::autograd::AutogradContext* ctx, - torch::autograd::variable_list grad_out_list) { + const torch::autograd::variable_list& grad_out_list) { const int64_t group_size = ctx->saved_data["group_size"].toInt(); const std::string& group_name = ctx->saved_data["group_name"].toStringRef(); DCHECK(grad_out_list.size() == 1); - auto grad_out = grad_out_list[0]; + const auto& grad_out = grad_out_list[0]; auto out = c10::Dispatcher::singleton() diff --git a/torch/csrc/distributed/c10d/NCCLUtils.cpp b/torch/csrc/distributed/c10d/NCCLUtils.cpp index b3f6acb62598..f8b838f21c4d 100644 --- a/torch/csrc/distributed/c10d/NCCLUtils.cpp +++ b/torch/csrc/distributed/c10d/NCCLUtils.cpp @@ -2,9 +2,8 @@ #include #include -#include #include -#include +#include #ifdef USE_C10D_NCCL #include @@ -47,7 +46,7 @@ void NCCLComm::waitUntilInitialized(int timeoutSecs) { auto startTimepoint = std::chrono::steady_clock::now(); while (!initialized_) { if (ncclComm_) { - ncclResult_t result; + ncclResult_t result{}; ncclCommGetAsyncError(ncclComm_, &result); if (result == ncclSuccess) { LOG(INFO) << "Rank " << rank_ << ": NCCL communicator is initialized."; @@ -98,7 +97,7 @@ std::string getNcclVersion() { static std::string versionString; c10::call_once(ncclGetVersionFlag, []() { - int version; + int version = 0; ncclResult_t status = ncclGetVersion(&version); // can't compute the version if call did not return successfully or version // code < 100 (corresponding to 0.1.0) @@ -116,7 +115,7 @@ std::string getNcclVersion() { std::to_string(ncclMinor) + "." + std::to_string(ncclPatch); #ifdef NCCL_SUFFIX const auto ncclSuffix = std::string(NCCL_SUFFIX); - if (ncclSuffix.length()) { + if (!ncclSuffix.empty()) { versionString += "." + ncclSuffix; } #endif @@ -134,16 +133,14 @@ size_t hashTensors(const std::vector& tensors) { size_t data_size = tensor.storage().nbytes(); if (data_size > 0 && tensor.storage().data_ptr()) { auto src = static_cast(tensor.storage().data_ptr().get()); - char* dst = (char*)std::calloc(data_size, sizeof(char)); + std::vector dst(data_size); // This is needed so that we trigger a device synchronization so we can // get the collective finished if launched on GPU and hash its output. - cudaMemcpy(dst, src, data_size, cudaMemcpyDeviceToHost); + cudaMemcpy(dst.data(), src, data_size, cudaMemcpyDeviceToHost); for (size_t i = 0; i < data_size; ++i) { // Update the hash for each byte in the tensor - hash = c10::hash_combine( - hash, c10::get_hash(((char*)dst)[i], data_size)); + hash = c10::hash_combine(hash, c10::get_hash(dst[i], data_size)); } - free(dst); } } } @@ -199,7 +196,7 @@ std::string getNcclErrorDetailStr( std::string interpret; std::string err; #ifdef ENABLE_NCCL_GET_LAST_ERROR - auto ret = ncclGetLastError(NULL); + auto ret = ncclGetLastError(nullptr); if (ret) { err = "\nLast error:\n" + std::string(ret); } else { @@ -244,7 +241,7 @@ std::string getNcclErrorDetailStr( control_plane::RegisterHandler dumpHandler{ "dump_nccl_trace_pickle", [](const control_plane::Request& req, control_plane::Response& res) { - const auto params = req.params(); + const auto& params = req.params(); size_t validParamCount = 0; // valid params @@ -292,7 +289,7 @@ control_plane::RegisterHandler dumpHandler{ control_plane::RegisterHandler jsonDumpHandler{ "dump_nccl_trace_json", [](const control_plane::Request& req, control_plane::Response& res) { - const auto params = req.params(); + const auto& params = req.params(); size_t validParamCount = 0; // valid params @@ -347,6 +344,11 @@ void DebugInfoWriter::write(const std::string& ncclTrace) { } file.write(ncclTrace.data(), ncclTrace.size()); + if (!file) { + LOG(ERROR) << "Error opening file for writing NCCLPG debug info: " + << filename_; + return; + } LOG(INFO) << "Finished writing NCCLPG debug info to " << filename_; } @@ -391,7 +393,7 @@ std::optional NCCLTraceBuffer::record( } if (all_pg_status_.find(pg_id) == all_pg_status_.end()) { // Current pg_status is not in FR. - all_pg_status_[pg_id] = pg_status; + all_pg_status_[pg_id] = std::move(pg_status); } auto traceback = torch::CapturedTraceback::gather(true, true, capture_cpp_stack_); @@ -406,8 +408,8 @@ std::optional NCCLTraceBuffer::record( op_id, std::move(profiling_name), std::move(traceback), - std::move(start), - std::move(end), + start, + end, c10::getTime(), timeout_ms.count(), isP2P, @@ -424,14 +426,14 @@ std::optional NCCLTraceBuffer::record( for (const auto& input : inputs) { c10::IntArrayRef sizes = input.sizes(); te.input_dtypes_.push_back(input.dtype().toScalarType()); - te.input_dims_.push_back(sizes.size()); + te.input_dims_.push_back(static_cast(sizes.size())); te.sizes_.insert(te.sizes_.end(), sizes.begin(), sizes.end()); } for (const auto& output : outputs) { c10::IntArrayRef sizes = output.sizes(); te.output_dtypes_.push_back(output.dtype().toScalarType()); - te.output_dims_.push_back(sizes.size()); + te.output_dims_.push_back(static_cast(sizes.size())); te.sizes_.insert(te.sizes_.end(), sizes.begin(), sizes.end()); } @@ -453,7 +455,7 @@ void NCCLTraceBuffer::record_pg_ranks( return; } std::lock_guard guard(mutex_); - pg_name_to_ranks_[pg_name] = ranks; + pg_name_to_ranks_[pg_name] = std::move(ranks); } void NCCLTraceBuffer::update_state(Entry& r) { @@ -475,8 +477,14 @@ std::vector NCCLTraceBuffer::dump_entries() { std::lock_guard guard(mutex_); std::vector result; result.reserve(entries_.size()); - result.insert(result.end(), entries_.begin() + next_, entries_.end()); - result.insert(result.end(), entries_.begin(), entries_.begin() + next_); + result.insert( + result.end(), + entries_.begin() + static_cast(next_), + entries_.end()); + result.insert( + result.end(), + entries_.begin(), + entries_.begin() + static_cast(next_)); // query any remaining events for (auto& r : result) { update_state(r); @@ -566,7 +574,7 @@ const c10::List NCCLTraceBuffer::getCollectiveTrace( if (includeStacktraces) { auto& tb = stracebacks.tracebacks.at(i); auto frames = new_list(); - for (int64_t frame : tb) { + for (auto frame : tb) { frames.push_back(all_frames.at(frame)); } dict.insert(frames_key, frames); @@ -585,7 +593,7 @@ const c10::List NCCLTraceBuffer::getCollectiveTrace( } auto it = e.sizes_.begin(); - auto read_sizes = [&](const c10::SmallVector& dims) { + auto read_sizes = [&](const c10::SmallVector& dims) { auto sizes = new_list(); for (auto dim : dims) { auto arg_sizes = new_list(); @@ -601,14 +609,14 @@ const c10::List NCCLTraceBuffer::getCollectiveTrace( std::vector input_dtypes_strs; input_dtypes_strs.reserve(e.input_dtypes_.size()); for (const auto& input_dtype : e.input_dtypes_) { - input_dtypes_strs.push_back(c10::toString(input_dtype)); + input_dtypes_strs.emplace_back(c10::toString(input_dtype)); } dict.insert(input_dtypes_key, input_dtypes_strs); dict.insert(output_sizes_key, read_sizes(e.output_dims_)); std::vector output_dtypes_strs; output_dtypes_strs.reserve(e.output_dtypes_.size()); for (const auto& output_dtype : e.output_dtypes_) { - output_dtypes_strs.push_back(c10::toString(output_dtype)); + output_dtypes_strs.emplace_back(c10::toString(output_dtype)); } dict.insert(output_dtypes_key, output_dtypes_strs); if (e.time_discovered_completed_.has_value()) { @@ -723,10 +731,10 @@ std::string NCCLTraceBuffer::dump_json( j[duration_key_str] = *e.duration_; } auto it = e.sizes_.begin(); - auto read_sizes = [&](const c10::SmallVector& dims) { - auto sizes = std::list>(); + auto read_sizes = [&](const c10::SmallVector& dims) { + auto sizes = std::list>(); for (auto dim : dims) { - auto arg_sizes = std::list(); + auto arg_sizes = std::list(); for (auto i : c10::irange(dim)) { (void)i; arg_sizes.push_back(*it++); @@ -739,14 +747,14 @@ std::string NCCLTraceBuffer::dump_json( std::vector input_dtypes_strs; input_dtypes_strs.reserve(e.input_dtypes_.size()); for (const auto& input_dtype : e.input_dtypes_) { - input_dtypes_strs.push_back(c10::toString(input_dtype)); + input_dtypes_strs.emplace_back(c10::toString(input_dtype)); } j[input_dtypes_key_str] = input_dtypes_strs; j[output_sizes_key_str] = read_sizes(e.output_dims_); std::vector output_dtypes_strs; output_dtypes_strs.reserve(e.output_dtypes_.size()); for (const auto& output_dtype : e.output_dtypes_) { - output_dtypes_strs.push_back(c10::toString(output_dtype)); + output_dtypes_strs.emplace_back(c10::toString(output_dtype)); } j[output_dtypes_key_str] = output_dtypes_strs; if (e.time_discovered_completed_.has_value()) { @@ -770,7 +778,7 @@ std::string NCCLTraceBuffer::dump_json( entries.emplace_back(j); } - if (entries.size() > 0) { + if (!entries.empty()) { result[entries_key_str] = entries; } } @@ -811,7 +819,7 @@ std::string NCCLTraceBuffer::dump( per_comm_dict.insert(ncclId, inner_dict); } } - if (per_comm_dict.size() > 0) { + if (!per_comm_dict.empty()) { result.insert(nccl_comm_key, per_comm_dict); } return pickle_str(result); diff --git a/torch/csrc/distributed/c10d/NCCLUtils.hpp b/torch/csrc/distributed/c10d/NCCLUtils.hpp index b1869c73903c..ef924e91c5e4 100644 --- a/torch/csrc/distributed/c10d/NCCLUtils.hpp +++ b/torch/csrc/distributed/c10d/NCCLUtils.hpp @@ -636,9 +636,9 @@ struct NCCLTraceBuffer { std::optional time_discovered_completed_; // size information for input/output tensors - c10::SmallVector input_dims_; + c10::SmallVector input_dims_; std::vector input_dtypes_; - c10::SmallVector output_dims_; + c10::SmallVector output_dims_; std::vector output_dtypes_; c10::SmallVector sizes_; // flattened from inputs, outputs bool retired_ = false; // is this work entry no longer in the workMetaList_? diff --git a/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp b/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp index 83bae62684fb..bfafaabdad19 100644 --- a/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp +++ b/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp @@ -2,13 +2,11 @@ #include #include -#include #include #include #include #include #include -#include #include #include @@ -87,7 +85,7 @@ ncclDataType_t getNcclDataType(at::ScalarType type) { return it->second; } -bool complexViewAsRealAllowed(const ReduceOp reduceOp) { +bool complexViewAsRealAllowed(const ReduceOp& reduceOp) { switch (reduceOp) { case ReduceOp::SUM: return true; @@ -110,7 +108,7 @@ ncclRedOpRAII unpackPreMulSum( const ncclComm_t& comm) { const auto* preMulSupplement = reinterpret_cast(reduceOp.supplement_.get()); - ncclRedOp_t preMulSum; + ncclRedOp_t preMulSum{}; bool has_tensor = preMulSupplement->tensor_factor.defined(); auto residence = has_tensor ? ncclScalarDevice : ncclScalarHostImmediate; const T* ptr_factor = has_tensor @@ -161,8 +159,7 @@ ncclRedOpRAII getNcclReduceOp( default: C10_THROW_ERROR( TypeError, "PreMulSum Data type must be half, float, or double"); - ncclRedOp_t unused; - return unused; + return ncclRedOp_t{}; } #else C10_THROW_ERROR(ValueError, "PreMulSum requires NCCL>=2.11.1"); @@ -260,7 +257,7 @@ std::string buildNcclUniqueIdStr(const ncclUniqueId& ncclID) { return oss.str(); } -std::string getNcclAbortedCommStoreKey(const std::string ncclIdStr) { +std::string getNcclAbortedCommStoreKey(const std::string& ncclIdStr) { return std::string(kNCCLAbortedCommStoreKey) + ":" + ncclIdStr; } @@ -513,8 +510,8 @@ std::ostream& operator<<( } ProcessGroupNCCL::WorkNCCL::WorkNCCL( - const std::string& pgUID, - const std::string& pgDesc, + std::string pgUID, + std::string pgDesc, at::Device& device, int rank, OpType opType, @@ -527,8 +524,8 @@ ProcessGroupNCCL::WorkNCCL::WorkNCCL( bool cudaEventCacheEnabled, DebugLevel distDebugLevel) : Work(rank, opType, profilingTitle, inputs), - pgUID_(pgUID), - pgDesc_(pgDesc), + pgUID_(std::move(pgUID)), + pgDesc_(std::move(pgDesc)), device_(device), workStartTime_(std::chrono::steady_clock::now()), seq_(seq), @@ -622,7 +619,7 @@ const std::string& ProcessGroupNCCL::WorkNCCL::logPrefix() const { void ProcessGroupNCCL::WorkNCCL::setException( std::exception_ptr exception_ptr) { std::unique_lock lock(mutex_); - exception_ = exception_ptr; + exception_ = std::move(exception_ptr); } // Helper that checks if the NCCL kernels are completed on the GPUs @@ -805,7 +802,7 @@ void ProcessGroupNCCL::WorkNCCL::abort() { ncclCommDevIdxMapMutex.unlock(); } -ProcessGroupNCCL::CUDAEventCache::CUDAEventCache() {} +ProcessGroupNCCL::CUDAEventCache::CUDAEventCache() = default; // CUDA event is used to record the start/end of one Work. // Instead of let the CUDA event gets destroyed, we now reuse it after the Work @@ -854,8 +851,8 @@ ProcessGroupNCCL::ProcessGroupNCCL( c10::intrusive_ptr options) : Backend(rank, size), store_(store), - options_(options), - ncclCommCounter_(0), + options_(std::move(options)), + traceKeyStart_(getTraceStartKey("NCCL", rank)), traceKeyEnd_(getTraceEndKey("NCCL", rank)), terminateProcessGroup_(false), @@ -1154,7 +1151,7 @@ void ProcessGroupNCCL::waitForFutureOrTimeout( ::c10d::C10dLoggingData data; if (log) { - data.integers["pg_id"] = local_id_; + data.integers["pg_id"] = static_cast(local_id_); data.integers["rank"] = rank_; data.integers["global_rank"] = globalRank(); data.strings["flight_recorder_version"] = c10d::version_val_str; @@ -1221,7 +1218,7 @@ void ProcessGroupNCCL::waitForFutureOrTimeout( void ProcessGroupNCCL::abortCommsFromMap( std::unordered_map>& ncclCommsMap, - std::optional abortReason) { + const std::optional& abortReason) { // The process may control multiple devices, loop through the communicators on // each device for (auto& it : ncclCommsMap) { @@ -1255,7 +1252,7 @@ void ProcessGroupNCCL::abortCommsFromMap( } // Abort all communicators on this rank -bool ProcessGroupNCCL::abort(std::optional abortReason) { +bool ProcessGroupNCCL::abort(const std::optional& abortReason) { // This will log counter for how long the abort actually takes. STATIC_SCOPED_WAIT_COUNTER(pytorch.ProcessGroupNCCL__abort); // Remove record from global ncclCommDevIdxMapMutex before aboarting, @@ -1276,7 +1273,7 @@ bool ProcessGroupNCCL::abort(std::optional abortReason) { return true; } -void ProcessGroupNCCL::shutdown(std::optional reason) { +void ProcessGroupNCCL::shutdown(const std::optional& reason) { // Don't join threads here since the purpose of this method is to abort all // communicators and signal the threads to exit. Joining on the threads could // potentially block and hence avoid it in this method. @@ -1357,13 +1354,13 @@ bool ProcessGroupNCCL::dumpDebuggingInfo() { return false; } -void ProcessGroupNCCL::terminateProcess(std::string errMsg) { +void ProcessGroupNCCL::terminateProcess(const std::string& errMsg) { // Logging with `FATAL`, after errMsg printed, it calls `std::abort()` // to terminate the program execution. LOG(FATAL) << logPrefix() << errMsg; } -int computeDeltaMS( +long computeDeltaMS( std::chrono::time_point start, std::chrono::time_point end) { return std::chrono::duration_cast(end - start) @@ -1752,7 +1749,7 @@ void ProcessGroupNCCL::addEphemeralTimeout( } bool ProcessGroupNCCL::verifyWorkTimeoutForTest( - const c10::intrusive_ptr work, + const c10::intrusive_ptr& work, const std::chrono::milliseconds& timeout) { // Since collective returns a c10d::Work, we need to cast it to WorkNCCL. if (auto workNCCL = c10::dynamic_intrusive_pointer_cast(work)) { @@ -1936,7 +1933,7 @@ void ProcessGroupNCCL::watchdogHandler() { // multiple times after the start if (pgStatus_->lastStartedSeq < static_cast(work.seq_) && work.isStarted()) { - pgStatus_->lastStartedSeq = work.seq_; + pgStatus_->lastStartedSeq = static_cast(work.seq_); pgStatus_->lastStartedWorkName = opTypeToString(work.opType_); } @@ -1950,7 +1947,7 @@ void ProcessGroupNCCL::watchdogHandler() { ephemeralTimeoutInflight_ -= work.ownedEphermeralTimeout_; } } - pgStatus_->lastCompletedSeq = work.seq_; + pgStatus_->lastCompletedSeq = static_cast(work.seq_); pgStatus_->lastCompletedWorkName = opTypeToString(work.opType_); pgStatus_->lastCompletedNumelIn = work.numelIn_; pgStatus_->lastCompletedNumelOut = work.numelOut_; @@ -2259,7 +2256,7 @@ std::shared_ptr ProcessGroupNCCL::getNCCLComm( } // GPU world size and GPU rank - int numRanks, rank; + int numRanks = -1, rank = -1; if (!singleP2POp) { // Collective, all-to-all, or batch P2P @@ -2372,7 +2369,7 @@ std::shared_ptr ProcessGroupNCCL::getNCCLComm( C10D_NCCL_CHECK(ncclGroupStart(), std::nullopt); } - ncclStreams_.emplace(deviceKey, std::move(streamVal)); + ncclStreams_.emplace(deviceKey, streamVal); // Note: these events are created with the (default) cudaEventDisableTiming // flag This flag provides the best performance when used with @@ -2461,7 +2458,7 @@ void check_gpu_single_tensor( // condition may be a challenge because the test would need to pass tensors on // different devices in the same process. int64_t check_gpu_tensors_same_device(const std::vector& tensors) { - if (tensors.size() == 0) { + if (tensors.empty()) { C10_THROW_ERROR(ValueError, "Tensor list must be nonempty"); } @@ -2600,7 +2597,7 @@ void ProcessGroupNCCL::assignTimeoutToWork( } void ProcessGroupNCCL::workEnqueue( - c10::intrusive_ptr work) { + const c10::intrusive_ptr& work) { if (!terminateProcessGroup_.load()) { std::lock_guard lock(workMetaListMutex_); // Avoid view tensors to be processed in cleanup thread. @@ -4265,7 +4262,8 @@ c10::intrusive_ptr ProcessGroupNCCL::barrier(const BarrierOptions& opts) { ValueError, barDevIdx >= 0, "Failed to infer a GPU device id to perform barrier. "); - auto barDevice = at::Device(at::DeviceType::CUDA, barDevIdx); + auto barDevice = at::Device( + at::DeviceType::CUDA, static_cast(barDevIdx)); // Create a dummy tensor on the device // Note: we use zeros() instead of empty() to prevent barrier from triggering @@ -4291,7 +4289,7 @@ c10::intrusive_ptr ProcessGroupNCCL::alltoall_base( const AllToAllOptions& /* unused */) { check_gpu_single_tensor(outputTensor, true); check_gpu_single_tensor(inputTensor, true); - if (outputSplitSizes.size() == 0 && inputSplitSizes.size() == 0) { + if (outputSplitSizes.empty() && inputSplitSizes.empty()) { RECORD_PARAM_COMMS_DATA( std::make_tuple( static_cast(seqCollective_) + 1, @@ -4553,7 +4551,8 @@ void ProcessGroupNCCL::groupEnd() { --ncclActiveGroupCounter_; } -void ProcessGroupNCCL::groupEndNonblocking(std::shared_ptr comm) { +void ProcessGroupNCCL::groupEndNonblocking( + const std::shared_ptr& comm) { #ifndef NCCL_HAS_COMM_NONBLOCKING C10D_NCCL_CHECK(ncclGroupEnd(), std::nullopt); #else @@ -4602,7 +4601,7 @@ c10::intrusive_ptr ProcessGroupNCCL::gather( outputs = outputTensors[0]; } else { // if not in the root rank, initialize outputs as empty list - if (outputTensors.size() != 0) { + if (!outputTensors.empty()) { invalidArgument("requires empty output on non-root"); } outputs = {}; @@ -4643,13 +4642,14 @@ c10::intrusive_ptr ProcessGroupNCCL::gather( const auto root = opts.rootRank; if (getRank() == root) { if (!avoidRecordStreams_) { - for (auto output : outputs) { + for (auto const& output : outputs) { c10::cuda::CUDACachingAllocator::recordStream( output.storage().data_ptr(), stream); } } } - torch::cuda::nccl::gather(inputTensor, outputs, comm, stream, root); + torch::cuda::nccl::gather( + inputTensor, outputs, comm, stream, static_cast(root)); return ncclSuccess; }, [](at::cuda::CUDAStream&, @@ -4696,7 +4696,7 @@ c10::intrusive_ptr ProcessGroupNCCL::scatter( } else { // if not in the root rank, initialize inputTensors as empty place holder // with an empty list - if (inputTensors.size() != 0) { + if (!inputTensors.empty()) { invalidArgument("requires empty input on non-root"); } inputs = {}; @@ -4740,13 +4740,14 @@ c10::intrusive_ptr ProcessGroupNCCL::scatter( at::cuda::CUDAStream& stream) { if (getRank() == root) { if (!avoidRecordStreams) { - for (auto input : inputs) { + for (auto const& input : inputs) { c10::cuda::CUDACachingAllocator::recordStream( input.storage().data_ptr(), stream); } } } - torch::cuda::nccl::scatter(inputs, outputTensor, comm, stream, root); + torch::cuda::nccl::scatter( + inputs, outputTensor, comm, stream, static_cast(root)); return ncclSuccess; }, [](at::cuda::CUDAStream&, diff --git a/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp b/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp index 284cd4a9a2ef..428357fc9089 100644 --- a/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp +++ b/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp @@ -272,8 +272,8 @@ class TORCH_API ProcessGroupNCCL : public Backend { // Constructor takes a list of CUDA devices WorkNCCL( - const std::string& pgUID, - const std::string& pgDesc, + std::string pgUID, + std::string pgDesc, at::Device& device, int rank, OpType opType, @@ -373,7 +373,7 @@ class TORCH_API ProcessGroupNCCL : public Backend { bool avoidRecordStreams_{false}; // Clone of opTimeout_ from ProcessGroupNCCL. - std::chrono::milliseconds opTimeout_; + std::chrono::milliseconds opTimeout_{}; // Ephemeral timeouts are owned by exactly one work, // and reset after that work completes. @@ -457,7 +457,7 @@ class TORCH_API ProcessGroupNCCL : public Backend { private: std::mutex cacheMutex_; - // NOTE: We intentionaly store raw pointers so that + // NOTE: We intentionally store raw pointers so that // we do not attempt to destroy the event objects on process exit, // because cuda may be gone. std::vector @@ -520,7 +520,7 @@ class TORCH_API ProcessGroupNCCL : public Backend { int size, const std::string& groupName, c10::intrusive_ptr options = Options::create()) - : ProcessGroupNCCL(store, rank, size, options) {} + : ProcessGroupNCCL(store, rank, size, std::move(options)) {} ~ProcessGroupNCCL() override; @@ -643,7 +643,7 @@ class TORCH_API ProcessGroupNCCL : public Backend { void groupEnd(); - void groupEndNonblocking(std::shared_ptr comm); + void groupEndNonblocking(const std::shared_ptr& comm); c10::intrusive_ptr gather( std::vector>& outputTensors, @@ -682,16 +682,16 @@ class TORCH_API ProcessGroupNCCL : public Backend { // Helper function for iteratively aborting communicators in the provided map void abortCommsFromMap( std::unordered_map>& ncclCommsMap, - std::optional abortReason); + const std::optional& abortReason); c10::intrusive_ptr initIntraNodeComm(); // Provides an API to abort the ProcessGroup (similar to ncclCommAbort) // instead of relying on ProcessGroupNCCL destructor. // return true if abort is successful, otherwise false - bool abort(std::optional abortReason = std::nullopt); + bool abort(const std::optional& abortReason = std::nullopt); - void shutdown(std::optional reason = std::nullopt); + void shutdown(const std::optional& reason = std::nullopt); void eagerConnectSingleDevice(at::Device device) override; @@ -712,7 +712,7 @@ class TORCH_API ProcessGroupNCCL : public Backend { // `opTimeout_` of the provided WorkNCCL instance is the same as the specified // timeout. bool verifyWorkTimeoutForTest( - const c10::intrusive_ptr work, + const c10::intrusive_ptr& work, const std::chrono::milliseconds& timeout); protected: @@ -903,7 +903,7 @@ class TORCH_API ProcessGroupNCCL : public Backend { // Function that directly trigger std::abort so that the whole process // gets terminated. - virtual void terminateProcess(std::string errMsg); + virtual void terminateProcess(const std::string& errMsg); // A helper function to wait for a future to complete or timeout. void waitForFutureOrTimeout( @@ -1089,7 +1089,7 @@ class TORCH_API ProcessGroupNCCL : public Backend { std::list completedWorkList_; // Add Work Pointer to workVector - void workEnqueue(c10::intrusive_ptr); + void workEnqueue(const c10::intrusive_ptr&); // The CUDA streams used by NCCL kernels std::unordered_map ncclStreams_; diff --git a/torch/csrc/distributed/c10d/TraceUtils.h b/torch/csrc/distributed/c10d/TraceUtils.h index 9684ebe468a8..b211fc83564a 100644 --- a/torch/csrc/distributed/c10d/TraceUtils.h +++ b/torch/csrc/distributed/c10d/TraceUtils.h @@ -8,13 +8,10 @@ #include #include #include -#include #include #include -#include #include -#include #include namespace c10d { @@ -27,7 +24,7 @@ struct ProcessGroupStatus { int64_t lastEnqueuedSeq{-1}; // the sequential number of the last collective started as the kernel int64_t lastStartedSeq{-1}; - // the sequential number of the last colletive completed marked by + // the sequential number of the last collective completed marked by // the watchdog thread // initialized to be -1 to indicate no collective has been completed int64_t lastCompletedSeq{-1}; @@ -129,7 +126,7 @@ inline std::string analyzeLaggingRanks(const TraceMap& traceMap) { std::string report = "\n\t - To our best knowledge, the lagging/dead/mismatched ranks " "that caused the desync are:"; - if (startRanks.size()) { + if (!startRanks.empty()) { report += c10::str( "\n\t - [", ranksToString(startRanks), @@ -137,7 +134,7 @@ inline std::string analyzeLaggingRanks(const TraceMap& traceMap) { lagSeq, " (count from 1)"); } - if (endRanks.size()) { + if (!endRanks.empty()) { report += c10::str( "\n\t [", ranksToString(endRanks), @@ -169,7 +166,7 @@ inline std::string dumpSnapshot(TraceMap& traceMap) { } } - if (collectivesStart.size()) { + if (!collectivesStart.empty()) { report += c10::str("\n\t #", seq, " started ranks:"); for (auto& mapPair : collectivesStart) { report += c10::str( @@ -179,7 +176,7 @@ inline std::string dumpSnapshot(TraceMap& traceMap) { mapPair.first); } } - if (collectivesEnd.size()) { + if (!collectivesEnd.empty()) { report += c10::str("\n\t #", seq, " finished ranks:"); for (auto& mapPair : collectivesEnd) { report += c10::str( @@ -218,7 +215,7 @@ inline std::string retrieveDesyncReport( int worldSize) { std::string report; - uint64_t thisSeq; + uint64_t thisSeq = 0; std::string thisCol; std::vector missingRanks; @@ -226,7 +223,7 @@ inline std::string retrieveDesyncReport( for (const auto rank : c10::irange(worldSize)) { // Build traceMapStart. - uint64_t seqStart; + uint64_t seqStart = 0; { std::string traceKeyStart = getTraceStartKey(pgName, rank); if (!store->check({traceKeyStart})) { @@ -250,7 +247,7 @@ inline std::string retrieveDesyncReport( if (!store->check({traceKeyEnd})) { continue; } - uint64_t seq; + uint64_t seq = 0; std::string col; if (!parseTraceValue(store, traceKeyEnd, seq, col)) { return report; @@ -323,7 +320,7 @@ inline std::string get_python_cpp_trace() { auto frame_id = s_tb[idx]; const auto& frame = s_tbs.all_frames.at(frame_id); oss << "#" << idx << " " << frame.funcname << " from " << frame.filename - << ":" << frame.lineno << std::endl; + << ":" << frame.lineno << '\n'; } return oss.str(); } diff --git a/torch/csrc/distributed/c10d/c10d.h b/torch/csrc/distributed/c10d/c10d.h index 5151a33f7ee3..4f1f92af9976 100644 --- a/torch/csrc/distributed/c10d/c10d.h +++ b/torch/csrc/distributed/c10d/c10d.h @@ -2,12 +2,8 @@ #include -namespace torch { -namespace distributed { -namespace c10d { +namespace torch::distributed::c10d { PyMethodDef* python_functions(); -} // namespace c10d -} // namespace distributed -} // namespace torch +} // namespace torch::distributed::c10d diff --git a/torch/csrc/distributed/c10d/error.h b/torch/csrc/distributed/c10d/error.h index fff2b45c4c95..fef7a630410f 100644 --- a/torch/csrc/distributed/c10d/error.h +++ b/torch/csrc/distributed/c10d/error.h @@ -45,12 +45,10 @@ struct formatter { } // namespace fmt -namespace c10d { -namespace detail { +namespace c10d::detail { inline std::error_code lastError() noexcept { return std::error_code{errno, std::generic_category()}; } -} // namespace detail -} // namespace c10d +} // namespace c10d::detail diff --git a/torch/csrc/distributed/c10d/logging.h b/torch/csrc/distributed/c10d/logging.h index a7cc82f702ee..6b15aa358f26 100644 --- a/torch/csrc/distributed/c10d/logging.h +++ b/torch/csrc/distributed/c10d/logging.h @@ -12,8 +12,7 @@ #include #include -namespace c10d { -namespace detail { +namespace c10d::detail { enum class LogLevel { Trace, Debug, Info, Warning, Error }; @@ -24,8 +23,7 @@ std::string formatLogMessage(fmt::string_view fmt, T&&... args) { return fmt::vformat(fmt, fmt::make_format_args(args...)); } -} // namespace detail -} // namespace c10d +} // namespace c10d::detail #define C10D_ERROR(...) \ if (c10d::detail::isLogLevelEnabled(c10d::detail::LogLevel::Error)) \ diff --git a/torch/csrc/distributed/c10d/socket.h b/torch/csrc/distributed/c10d/socket.h index de9bd6989c29..81659f11f049 100644 --- a/torch/csrc/distributed/c10d/socket.h +++ b/torch/csrc/distributed/c10d/socket.h @@ -16,8 +16,7 @@ #include #include -namespace c10d { -namespace detail { +namespace c10d::detail { class SocketOptions { public: @@ -103,5 +102,4 @@ class Socket { std::unique_ptr impl_; }; -} // namespace detail -} // namespace c10d +} // namespace c10d::detail diff --git a/torch/csrc/distributed/c10d/socket_fmt.h b/torch/csrc/distributed/c10d/socket_fmt.h index 8c7832ebf933..491d9241eaf9 100644 --- a/torch/csrc/distributed/c10d/socket_fmt.h +++ b/torch/csrc/distributed/c10d/socket_fmt.h @@ -22,11 +22,9 @@ as it exposes the underlying platform specific socket headers. #include #endif -namespace c10d { -namespace detail { +namespace c10d::detail { // Returns a human-readable representation of the given socket address. std::string formatSockAddr(const struct ::sockaddr* addr, socklen_t len); -} // namespace detail -} // namespace c10d +} // namespace c10d::detail