From 84b3f1900a4f9e709aefbbaaea8d2adc52e1b58b Mon Sep 17 00:00:00 2001 From: David Berard Date: Tue, 20 Aug 2024 09:44:41 -0700 Subject: [PATCH] C++ network flow implementation in c10 (#132188) The functorch partitioners use network flow to split the joint graph into a forward and backward graph. Internally, we've found that upgrading to networkx 2.8.8 (from 2.5) results in some hard-to-debug failures (internal reference: https://fburl.com/workplace/jrqwagdm). And I'm told that there's interest to remove the python dependency. So this PR introduces a C++ implementation that mirrors the API provided by networkx. We'll need to add python bindings and do some additional testing to verify correctness. Differential Revision: [D61550977](https://our.internmc.facebook.com/intern/diff/D61550977) Pull Request resolved: https://github.com/pytorch/pytorch/pull/132188 Approved by: https://github.com/Chillee --- c10/test/util/NetworkFlow_test.cpp | 175 +++++++++++++++++ c10/util/NetworkFlow.cpp | 303 +++++++++++++++++++++++++++++ c10/util/NetworkFlow.h | 54 +++++ 3 files changed, 532 insertions(+) create mode 100644 c10/test/util/NetworkFlow_test.cpp create mode 100644 c10/util/NetworkFlow.cpp create mode 100644 c10/util/NetworkFlow.h diff --git a/c10/test/util/NetworkFlow_test.cpp b/c10/test/util/NetworkFlow_test.cpp new file mode 100644 index 000000000000..8c88815bdaf6 --- /dev/null +++ b/c10/test/util/NetworkFlow_test.cpp @@ -0,0 +1,175 @@ +#include +#include +#include +#include + +namespace { + +template +bool vector_contains(const std::vector& vec, const T& element) { + for (const auto& e : vec) { + if (e == element) { + return true; + } + } + return false; +} + +template +void expect_vector_contains_subset( + const std::vector& vec, + const std::vector& subset) { + for (auto& element : subset) { + if (!vector_contains(vec, element)) { + std::stringstream ss; + ss << "Failed: checking whether {"; + for (auto& e : subset) { + ss << e << ", "; + } + ss << "} is a subset of {"; + for (auto& e : vec) { + ss << e << ", "; + } + ss << "}, but couldn't find " << element; + FAIL() << ss.str(); + } + } +} + +namespace test_network_flow { + +TEST(NetworkFlowTest, basic) { + /* + * 3 1 2 + * -->b-- ->e-- + * / 1| \/ \ + * / 2 v 2/\ 2 \ + * a---->c-/ ->f---->h + * \ 2\/ / + * \3 1/\ 2/ + * -->d-- ->g-- + * + * Consider these augmenting paths that constitute a blocking flow: + * a -> d -> f -> h (capacity 1), saturates d->f + * a -> c -> g -> h (capacity 2), saturates a->c, c->g, g->h + * a -> b -> c -> e -> h (capacity 1), saturates b->c + * a -> b -> f -> h (capacity 1), saturates b->f, f->h + */ + c10::NetworkFlowGraph g; + g.add_edge("a", "b", 3); // flow: 2 + g.add_edge("a", "c", 2); // flow: 2 + g.add_edge("a", "d", 3); // flow: 1 + g.add_edge("b", "f", 1); // flow: 1 + g.add_edge("c", "e", 2); // flow: 1 + g.add_edge("c", "g", 2); // flow: 2 + g.add_edge("d", "f", 1); // flow: 1 + g.add_edge("b", "c", 1); // flow: 1 + g.add_edge("e", "h", 2); // flow: 1 + g.add_edge("f", "h", 2); // flow: 2 + g.add_edge("g", "h", 2); // flow: 2 + auto res = g.minimum_cut("a", "h"); + EXPECT_EQ(res.status, c10::MinCutStatus::SUCCESS); + EXPECT_EQ(res.max_flow, 5); + + // how we "reach" these vertices from "h": + // h -> e: we see the e->h edge has residual capacity + // e -> c: we see the c->e edge has residual capacity + // c -> g: the c->g edge has flow, therefore the g->c edge has residual + // capacity + expect_vector_contains_subset(res.unreachable, {"h", "e", "c", "g"}); + expect_vector_contains_subset(res.reachable, {"a", "b", "d", "f"}); +} + +TEST(NetworkFlowTest, loop) { + /* 1 + * ------------------- + * / \ + * 1 / 1 1 \ 1 + * a --------> b --------> c -------> d --------> e + */ + c10::NetworkFlowGraph g; + g.add_edge("a", "b", 1); // flow: 1 + g.add_edge("b", "c", 1); // flow: 1 + g.add_edge("c", "d", 1); // flow: 1 + g.add_edge("d", "e", 1); // flow: 1 + g.add_edge("d", "b", 1); // flow: 0 + auto res = g.minimum_cut("a", "e"); + EXPECT_EQ(res.status, c10::MinCutStatus::SUCCESS); + EXPECT_EQ(res.max_flow, 1); + + expect_vector_contains_subset(res.unreachable, {"e"}); + expect_vector_contains_subset(res.reachable, {"a", "b", "c", "d"}); +} + +TEST(NetworkFlowTest, disconnected_vertices) { + /* + * 1 + * c --------> d + * + * 1 + * a --------> b + */ + c10::NetworkFlowGraph g; + g.add_edge("a", "b", 1); // flow: 1 + g.add_edge("c", "d", 1); // flow: 0 + auto res = g.minimum_cut("a", "b"); + EXPECT_EQ(res.status, c10::MinCutStatus::SUCCESS); + EXPECT_EQ(res.max_flow, 1); + + expect_vector_contains_subset(res.unreachable, {"b"}); + // unintuitively, "c" and "d" get marked as reachable; this mirrors networkx + // behavior. + expect_vector_contains_subset(res.reachable, {"a", "c", "d"}); +} + +TEST(NetworkFlowTest, invalid_endpoints) { + c10::NetworkFlowGraph g; + g.add_edge("a", "b", 1); + auto res = g.minimum_cut("a", "c"); + EXPECT_EQ(res.status, c10::MinCutStatus::INVALID); + + res = g.minimum_cut("c", "b"); + EXPECT_EQ(res.status, c10::MinCutStatus::INVALID); +} + +TEST(NetworkFlowTest, unbounded) { + c10::NetworkFlowGraph g; + g.add_edge("a", "b", c10::NetworkFlowGraph::INF); + auto res = g.minimum_cut("a", "b"); + EXPECT_EQ(res.status, c10::MinCutStatus::UNBOUNDED); +} + +TEST(NetworkFlowTest, overflow) { + c10::NetworkFlowGraph g; + auto flow1 = c10::NetworkFlowGraph::INF / 2; + auto flow2 = c10::NetworkFlowGraph::INF - flow1; + g.add_edge("a", "b", flow1); + g.add_edge("a", "b", flow2); + auto res = g.minimum_cut("a", "b"); + EXPECT_EQ(res.status, c10::MinCutStatus::OVERFLOW_INF); +} + +TEST(NetworkFlowTest, reverse_edge) { + /* + * 100 + * -------- + * / \ + * 1 < 1 \ + * a ---------> b ---------> c + * + */ + c10::NetworkFlowGraph g; + g.add_edge("a", "b", 1); + g.add_edge("b", "c", 1); + g.add_edge("c", "a", 100); + auto res = g.minimum_cut("a", "c"); + EXPECT_EQ(res.status, c10::MinCutStatus::SUCCESS); + EXPECT_EQ(res.max_flow, 1); + + expect_vector_contains_subset(res.unreachable, {"c"}); + expect_vector_contains_subset(res.reachable, {"a", "b"}); +} + +} // namespace test_network_flow + +} // namespace diff --git a/c10/util/NetworkFlow.cpp b/c10/util/NetworkFlow.cpp new file mode 100644 index 000000000000..17eb40371987 --- /dev/null +++ b/c10/util/NetworkFlow.cpp @@ -0,0 +1,303 @@ +#include + +#include + +#include +#include +#include +#include +#include + +namespace c10 { + +namespace { + +struct DinicFlowGraph { + // [Note: Dinic graph format] + // The graph is represented as an adjacency list: + // for a vertex u, adj[u] lists all the outgoing edges from u. + // adj[u][i] is the index of the i-th outgoing edge from u. + // To get information on the i-th outgoing edge from u, use + // edges[adj[i][i]]. + // The edges are directed and are paired with a reverse edge. + // For example, an edge u->v is paired with a v->u edge. + // The index of the reverse edge of e is stored as e.other_idx. + // Capacities and flows: each edge has a capacity and a flow + // associated with it. When flow is added to an edge, it removes + // capacity from the reverse edge. + struct Edge { + size_t u, v; + int64_t capacity; + int64_t flow; + size_t other_idx; // reverse edge + + int64_t residual_capacity() const { + return capacity - flow; + } + }; + + std::vector edges; + std::vector> adj; // adjacency list + std::vector vertex_names; + std::unordered_map mapping; + size_t graph_size; + + void add_flow(Edge& e, int64_t more) { + e.flow += more; + edges[e.other_idx].flow -= more; + } + + const Edge& reverse_edge(const Edge& e) const { + return edges[e.other_idx]; + } + + DinicFlowGraph(const NetworkFlowGraph& g) { + size_t vertex_count = 0; + + auto get_idx = [&vertex_count, this](const std::string& name) { + if (!mapping.count(name)) { + TORCH_CHECK(vertex_count == vertex_names.size()); + vertex_names.push_back(name); + size_t idx = vertex_count; + vertex_count++; + mapping[name] = idx; + return idx; + } + return mapping[name]; + }; + + for (const auto& [source, dest, capacity] : g.edges) { + auto u = get_idx(source); + auto v = get_idx(dest); + auto fwd_idx = edges.size(); + auto bwd_idx = edges.size() + 1; + edges.push_back({u, v, capacity, 0, bwd_idx}); + edges.push_back({v, u, 0, 0, fwd_idx}); + } + + // NOLINTNEXTLINE(cppcoreguidelines-prefer-member-initializer) + graph_size = mapping.size(); + adj.resize(graph_size); + + for (size_t i = 0; i < edges.size(); ++i) { + adj[edges[i].u].push_back(i); + } + } + + std::vector> residual_level_graph(size_t s) const { + // The residual graph is the graph including only edges + // where edge.residual_capacity() is nonzero, i.e. + // edge.capacity > edge.flow. + // The residual level graph is constructed by: + // 1. doing a BFS on the residual graph, assigning levels + // to each vertex. + // 2. only include edges u->v where level[v] == leve[u] + 1 + std::queue q; + // let level[u] = 0 if it has not been visited yet. + std::vector level(graph_size, 0); + // TODO(davidberard98) we can create this once and reuse it + std::vector> output_adjacency(graph_size); + level[s] = 1; + q.push(s); + while (!q.empty()) { + size_t u = q.front(); + q.pop(); + for (const auto& edge_idx : adj[u]) { + const auto& e = edges[edge_idx]; + if (e.residual_capacity()) { + if (level[e.v] == 0) { + level[e.v] = level[e.u] + 1; + q.push(e.v); + } + if (level[e.v] == level[e.u] + 1) { + output_adjacency[e.u].push_back(edge_idx); + } + } + } + } + + return output_adjacency; + } + + std::pair augment_iteration(size_t s, size_t t) { + // Perform one iteration of augmenting the flow. + // 1. Create the level graph + // 2. DFS to find augmenting paths + // 3. If encountering edges that don't lead to augmenting paths, + // trim them from the level graph. + // 4. Repeat 2-3 until we can't find any augmenting paths. + std::vector> level_adj = residual_level_graph(s); + + // TODO(davidberard98): implement this DFS with a stack + std::function dfs; + dfs = [&level_adj, &dfs, this]( + size_t u, size_t t, int64_t cur_cap) -> int64_t { + if (u == t) { + return cur_cap; + } + while (!level_adj[u].empty()) { + // Iterate over the outgoing edges from u. + // If take an edge and find that we can't augment using this edge, + // then delete it from our level graph. + // If we take an edge and it does find an augmenting path, then + // take the augmenting path and exit early + auto edge_idx = level_adj[u].back(); + auto& e = edges[edge_idx]; + auto taken_cap = dfs(e.v, t, std::min(cur_cap, e.residual_capacity())); + if (taken_cap) { + add_flow(e, taken_cap); + if (!e.residual_capacity()) { + // this edge has no remaining residual capacity, remove it. + level_adj[u].pop_back(); + } + return taken_cap; + } else { + // we can't get any capacity from this edge, remove it. + level_adj[u].pop_back(); + } + } + return 0; + }; + + int64_t additional_flow = 0; + while (int64_t f = dfs(s, t, NetworkFlowGraph::INF)) { + if (f == NetworkFlowGraph::INF) { + return {MinCutStatus::UNBOUNDED, 0}; + } + additional_flow += f; + if (additional_flow >= NetworkFlowGraph::INF) { + return {MinCutStatus::OVERFLOW_INF, 0}; + } + } + + return {MinCutStatus::SUCCESS, additional_flow}; + } + + std::pair compute_max_flow(size_t s, size_t t) { + int64_t total_flow = 0; + while (true) { + auto [status, additional_flow] = augment_iteration(s, t); + if (status != MinCutStatus::SUCCESS) { + return {status, 0}; + } + if (additional_flow == 0) { + break; + } + total_flow += additional_flow; + if (total_flow >= NetworkFlowGraph::INF) { + return {MinCutStatus::OVERFLOW_INF, 0}; + } + } + return {MinCutStatus::SUCCESS, total_flow}; + } + + std::vector reverse_bfs_reachable(size_t t) const { + // Find all vertices that are reachable from t in the reverse + // residual graph. + std::vector seen(graph_size, false); + seen[t] = true; + std::queue q; + q.push(t); + while (!q.empty()) { + auto x = q.front(); + q.pop(); + for (auto& edge_idx : adj[x]) { + // the edge that goes u -> v where v == x + const auto& e = reverse_edge(edges[edge_idx]); + if (!e.residual_capacity()) { + continue; + } + + if (!seen[e.u]) { + seen[e.u] = true; + q.push(e.u); + } + } + } + return seen; + } + + std::pair, std::vector> partition( + size_t s, + size_t t) { + // Note: the partitioning returns "reachable" / "unreachable", + // but specifically, for "unreachable", it returns "all vertices + // that are reachable from t in the reverse residual graph" + // and for "reachable" it returns all other nodes. This mirrors + // the behavior of networkx. + auto can_reach_t = reverse_bfs_reachable(t); + std::vector reachable, unreachable; + for (size_t i = 0; i < graph_size; ++i) { + if (can_reach_t[i]) { + unreachable.push_back(i); + } else { + reachable.push_back(i); + } + } + return std::pair, std::vector>( + std::move(reachable), std::move(unreachable)); + } + + MinCutResult minimum_cut(const std::string& s, const std::string& t) { + if (mapping.find(s) == mapping.end() || mapping.find(t) == mapping.end()) { + return { + MinCutStatus::INVALID, // status + 0, // max_flow + {}, // reachable + {}, // unreachable + }; + } + auto s_int = mapping[s]; + auto t_int = mapping[t]; + auto [status, max_flow] = compute_max_flow(s_int, t_int); + if (status != MinCutStatus::SUCCESS) { + return { + status, // status + 0, // max_flow + {}, // reachable + {}, // unreachable + }; + } + + auto [reachable_idxs, unreachable_idxs] = partition(s_int, t_int); + std::vector reachable, unreachable; + + auto idxs_to_names = [&](std::vector& src, + std::vector& dest) { + dest.reserve(src.size()); + for (auto idx : src) { + dest.push_back(vertex_names[idx]); + } + }; + + idxs_to_names(reachable_idxs, reachable); + idxs_to_names(unreachable_idxs, unreachable); + + return { + MinCutStatus::SUCCESS, + max_flow, + reachable, + unreachable, + }; + } +}; + +} // namespace + +MinCutStatus NetworkFlowGraph::add_edge( + const std::string& source, + const std::string& dest, + int64_t capacity) { + edges.push_back({source, dest, capacity}); + return MinCutStatus::SUCCESS; +} + +MinCutResult NetworkFlowGraph::minimum_cut( + const std::string& s, + const std::string& t) const { + auto flow_graph = DinicFlowGraph(*this); + + return flow_graph.minimum_cut(s, t); +} + +} // namespace c10 diff --git a/c10/util/NetworkFlow.h b/c10/util/NetworkFlow.h new file mode 100644 index 000000000000..684b88906578 --- /dev/null +++ b/c10/util/NetworkFlow.h @@ -0,0 +1,54 @@ +#pragma once + +#include + +#include +#include + +/** + * This file provides a network flow implementation. + * https://en.wikipedia.org/wiki/Flow_network + * + * It aims to mirror some of the behavior of networkx, which is/was used by + * functorch partitioners for splitting the graph into a forward and backward + * graph. + */ + +namespace c10 { + +enum class C10_API_ENUM MinCutStatus { + SUCCESS = 0, + UNBOUNDED = 1, + OVERFLOW_INF = 2, + INVALID = 3, +}; + +struct MinCutResult { + MinCutStatus status; + int64_t max_flow; + std::vector reachable; + std::vector unreachable; +}; + +// Modeled after networkx implementation +class C10_API NetworkFlowGraph { + public: + // selected such that INF + INF is < INT64_MAX + constexpr static int64_t INF = (1LL << 62) - 1; + + struct Edge { + std::string source, dest; + int64_t capacity; + }; + + MinCutStatus add_edge( + const std::string& source, + const std::string& dest, + int64_t capacity = 1); + + MinCutResult minimum_cut(const std::string& s, const std::string& t) const; + + std::vector edges; +}; + +} // namespace c10