C++ network flow implementation in c10 (#132188)

The functorch partitioners use network flow to split the joint graph into a forward and backward graph. Internally, we've found that upgrading to networkx 2.8.8 (from 2.5) results in some hard-to-debug failures (internal reference: https://fburl.com/workplace/jrqwagdm). And I'm told that there's interest to remove the python dependency.

So this PR introduces a C++ implementation that mirrors the API provided by networkx. We'll need to add python bindings and do some additional testing to verify correctness.

Differential Revision: [D61550977](https://our.internmc.facebook.com/intern/diff/D61550977)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/132188
Approved by: https://github.com/Chillee
This commit is contained in:
David Berard
2024-08-20 09:44:41 -07:00
committed by PyTorch MergeBot
parent 05304f59f0
commit 84b3f1900a
3 changed files with 532 additions and 0 deletions

View File

@ -0,0 +1,175 @@
#include <c10/test/util/Macros.h>
#include <c10/util/NetworkFlow.h>
#include <gtest/gtest.h>
#include <cstdlib>
namespace {
template <typename T>
bool vector_contains(const std::vector<T>& vec, const T& element) {
for (const auto& e : vec) {
if (e == element) {
return true;
}
}
return false;
}
template <typename T>
void expect_vector_contains_subset(
const std::vector<T>& vec,
const std::vector<T>& subset) {
for (auto& element : subset) {
if (!vector_contains(vec, element)) {
std::stringstream ss;
ss << "Failed: checking whether {";
for (auto& e : subset) {
ss << e << ", ";
}
ss << "} is a subset of {";
for (auto& e : vec) {
ss << e << ", ";
}
ss << "}, but couldn't find " << element;
FAIL() << ss.str();
}
}
}
namespace test_network_flow {
TEST(NetworkFlowTest, basic) {
/*
* 3 1 2
* -->b-- ->e--
* / 1| \/ \
* / 2 v 2/\ 2 \
* a---->c-/ ->f---->h
* \ 2\/ /
* \3 1/\ 2/
* -->d-- ->g--
*
* Consider these augmenting paths that constitute a blocking flow:
* a -> d -> f -> h (capacity 1), saturates d->f
* a -> c -> g -> h (capacity 2), saturates a->c, c->g, g->h
* a -> b -> c -> e -> h (capacity 1), saturates b->c
* a -> b -> f -> h (capacity 1), saturates b->f, f->h
*/
c10::NetworkFlowGraph g;
g.add_edge("a", "b", 3); // flow: 2
g.add_edge("a", "c", 2); // flow: 2
g.add_edge("a", "d", 3); // flow: 1
g.add_edge("b", "f", 1); // flow: 1
g.add_edge("c", "e", 2); // flow: 1
g.add_edge("c", "g", 2); // flow: 2
g.add_edge("d", "f", 1); // flow: 1
g.add_edge("b", "c", 1); // flow: 1
g.add_edge("e", "h", 2); // flow: 1
g.add_edge("f", "h", 2); // flow: 2
g.add_edge("g", "h", 2); // flow: 2
auto res = g.minimum_cut("a", "h");
EXPECT_EQ(res.status, c10::MinCutStatus::SUCCESS);
EXPECT_EQ(res.max_flow, 5);
// how we "reach" these vertices from "h":
// h -> e: we see the e->h edge has residual capacity
// e -> c: we see the c->e edge has residual capacity
// c -> g: the c->g edge has flow, therefore the g->c edge has residual
// capacity
expect_vector_contains_subset(res.unreachable, {"h", "e", "c", "g"});
expect_vector_contains_subset(res.reachable, {"a", "b", "d", "f"});
}
TEST(NetworkFlowTest, loop) {
/* 1
* -------------------
* / \
* 1 / 1 1 \ 1
* a --------> b --------> c -------> d --------> e
*/
c10::NetworkFlowGraph g;
g.add_edge("a", "b", 1); // flow: 1
g.add_edge("b", "c", 1); // flow: 1
g.add_edge("c", "d", 1); // flow: 1
g.add_edge("d", "e", 1); // flow: 1
g.add_edge("d", "b", 1); // flow: 0
auto res = g.minimum_cut("a", "e");
EXPECT_EQ(res.status, c10::MinCutStatus::SUCCESS);
EXPECT_EQ(res.max_flow, 1);
expect_vector_contains_subset(res.unreachable, {"e"});
expect_vector_contains_subset(res.reachable, {"a", "b", "c", "d"});
}
TEST(NetworkFlowTest, disconnected_vertices) {
/*
* 1
* c --------> d
*
* 1
* a --------> b
*/
c10::NetworkFlowGraph g;
g.add_edge("a", "b", 1); // flow: 1
g.add_edge("c", "d", 1); // flow: 0
auto res = g.minimum_cut("a", "b");
EXPECT_EQ(res.status, c10::MinCutStatus::SUCCESS);
EXPECT_EQ(res.max_flow, 1);
expect_vector_contains_subset(res.unreachable, {"b"});
// unintuitively, "c" and "d" get marked as reachable; this mirrors networkx
// behavior.
expect_vector_contains_subset(res.reachable, {"a", "c", "d"});
}
TEST(NetworkFlowTest, invalid_endpoints) {
c10::NetworkFlowGraph g;
g.add_edge("a", "b", 1);
auto res = g.minimum_cut("a", "c");
EXPECT_EQ(res.status, c10::MinCutStatus::INVALID);
res = g.minimum_cut("c", "b");
EXPECT_EQ(res.status, c10::MinCutStatus::INVALID);
}
TEST(NetworkFlowTest, unbounded) {
c10::NetworkFlowGraph g;
g.add_edge("a", "b", c10::NetworkFlowGraph::INF);
auto res = g.minimum_cut("a", "b");
EXPECT_EQ(res.status, c10::MinCutStatus::UNBOUNDED);
}
TEST(NetworkFlowTest, overflow) {
c10::NetworkFlowGraph g;
auto flow1 = c10::NetworkFlowGraph::INF / 2;
auto flow2 = c10::NetworkFlowGraph::INF - flow1;
g.add_edge("a", "b", flow1);
g.add_edge("a", "b", flow2);
auto res = g.minimum_cut("a", "b");
EXPECT_EQ(res.status, c10::MinCutStatus::OVERFLOW_INF);
}
TEST(NetworkFlowTest, reverse_edge) {
/*
* 100
* --------
* / \
* 1 < 1 \
* a ---------> b ---------> c
*
*/
c10::NetworkFlowGraph g;
g.add_edge("a", "b", 1);
g.add_edge("b", "c", 1);
g.add_edge("c", "a", 100);
auto res = g.minimum_cut("a", "c");
EXPECT_EQ(res.status, c10::MinCutStatus::SUCCESS);
EXPECT_EQ(res.max_flow, 1);
expect_vector_contains_subset(res.unreachable, {"c"});
expect_vector_contains_subset(res.reachable, {"a", "b"});
}
} // namespace test_network_flow
} // namespace

303
c10/util/NetworkFlow.cpp Normal file
View File

@ -0,0 +1,303 @@
#include <c10/util/NetworkFlow.h>
#include <c10/util/Exception.h>
#include <iostream>
#include <optional>
#include <queue>
#include <unordered_map>
#include <vector>
namespace c10 {
namespace {
struct DinicFlowGraph {
// [Note: Dinic graph format]
// The graph is represented as an adjacency list:
// for a vertex u, adj[u] lists all the outgoing edges from u.
// adj[u][i] is the index of the i-th outgoing edge from u.
// To get information on the i-th outgoing edge from u, use
// edges[adj[i][i]].
// The edges are directed and are paired with a reverse edge.
// For example, an edge u->v is paired with a v->u edge.
// The index of the reverse edge of e is stored as e.other_idx.
// Capacities and flows: each edge has a capacity and a flow
// associated with it. When flow is added to an edge, it removes
// capacity from the reverse edge.
struct Edge {
size_t u, v;
int64_t capacity;
int64_t flow;
size_t other_idx; // reverse edge
int64_t residual_capacity() const {
return capacity - flow;
}
};
std::vector<Edge> edges;
std::vector<std::vector<size_t>> adj; // adjacency list
std::vector<std::string> vertex_names;
std::unordered_map<std::string, size_t> mapping;
size_t graph_size;
void add_flow(Edge& e, int64_t more) {
e.flow += more;
edges[e.other_idx].flow -= more;
}
const Edge& reverse_edge(const Edge& e) const {
return edges[e.other_idx];
}
DinicFlowGraph(const NetworkFlowGraph& g) {
size_t vertex_count = 0;
auto get_idx = [&vertex_count, this](const std::string& name) {
if (!mapping.count(name)) {
TORCH_CHECK(vertex_count == vertex_names.size());
vertex_names.push_back(name);
size_t idx = vertex_count;
vertex_count++;
mapping[name] = idx;
return idx;
}
return mapping[name];
};
for (const auto& [source, dest, capacity] : g.edges) {
auto u = get_idx(source);
auto v = get_idx(dest);
auto fwd_idx = edges.size();
auto bwd_idx = edges.size() + 1;
edges.push_back({u, v, capacity, 0, bwd_idx});
edges.push_back({v, u, 0, 0, fwd_idx});
}
// NOLINTNEXTLINE(cppcoreguidelines-prefer-member-initializer)
graph_size = mapping.size();
adj.resize(graph_size);
for (size_t i = 0; i < edges.size(); ++i) {
adj[edges[i].u].push_back(i);
}
}
std::vector<std::vector<size_t>> residual_level_graph(size_t s) const {
// The residual graph is the graph including only edges
// where edge.residual_capacity() is nonzero, i.e.
// edge.capacity > edge.flow.
// The residual level graph is constructed by:
// 1. doing a BFS on the residual graph, assigning levels
// to each vertex.
// 2. only include edges u->v where level[v] == leve[u] + 1
std::queue<size_t> q;
// let level[u] = 0 if it has not been visited yet.
std::vector<size_t> level(graph_size, 0);
// TODO(davidberard98) we can create this once and reuse it
std::vector<std::vector<size_t>> output_adjacency(graph_size);
level[s] = 1;
q.push(s);
while (!q.empty()) {
size_t u = q.front();
q.pop();
for (const auto& edge_idx : adj[u]) {
const auto& e = edges[edge_idx];
if (e.residual_capacity()) {
if (level[e.v] == 0) {
level[e.v] = level[e.u] + 1;
q.push(e.v);
}
if (level[e.v] == level[e.u] + 1) {
output_adjacency[e.u].push_back(edge_idx);
}
}
}
}
return output_adjacency;
}
std::pair<MinCutStatus, int64_t> augment_iteration(size_t s, size_t t) {
// Perform one iteration of augmenting the flow.
// 1. Create the level graph
// 2. DFS to find augmenting paths
// 3. If encountering edges that don't lead to augmenting paths,
// trim them from the level graph.
// 4. Repeat 2-3 until we can't find any augmenting paths.
std::vector<std::vector<size_t>> level_adj = residual_level_graph(s);
// TODO(davidberard98): implement this DFS with a stack
std::function<int64_t(size_t, size_t, int64_t)> dfs;
dfs = [&level_adj, &dfs, this](
size_t u, size_t t, int64_t cur_cap) -> int64_t {
if (u == t) {
return cur_cap;
}
while (!level_adj[u].empty()) {
// Iterate over the outgoing edges from u.
// If take an edge and find that we can't augment using this edge,
// then delete it from our level graph.
// If we take an edge and it does find an augmenting path, then
// take the augmenting path and exit early
auto edge_idx = level_adj[u].back();
auto& e = edges[edge_idx];
auto taken_cap = dfs(e.v, t, std::min(cur_cap, e.residual_capacity()));
if (taken_cap) {
add_flow(e, taken_cap);
if (!e.residual_capacity()) {
// this edge has no remaining residual capacity, remove it.
level_adj[u].pop_back();
}
return taken_cap;
} else {
// we can't get any capacity from this edge, remove it.
level_adj[u].pop_back();
}
}
return 0;
};
int64_t additional_flow = 0;
while (int64_t f = dfs(s, t, NetworkFlowGraph::INF)) {
if (f == NetworkFlowGraph::INF) {
return {MinCutStatus::UNBOUNDED, 0};
}
additional_flow += f;
if (additional_flow >= NetworkFlowGraph::INF) {
return {MinCutStatus::OVERFLOW_INF, 0};
}
}
return {MinCutStatus::SUCCESS, additional_flow};
}
std::pair<MinCutStatus, int64_t> compute_max_flow(size_t s, size_t t) {
int64_t total_flow = 0;
while (true) {
auto [status, additional_flow] = augment_iteration(s, t);
if (status != MinCutStatus::SUCCESS) {
return {status, 0};
}
if (additional_flow == 0) {
break;
}
total_flow += additional_flow;
if (total_flow >= NetworkFlowGraph::INF) {
return {MinCutStatus::OVERFLOW_INF, 0};
}
}
return {MinCutStatus::SUCCESS, total_flow};
}
std::vector<bool> reverse_bfs_reachable(size_t t) const {
// Find all vertices that are reachable from t in the reverse
// residual graph.
std::vector<bool> seen(graph_size, false);
seen[t] = true;
std::queue<size_t> q;
q.push(t);
while (!q.empty()) {
auto x = q.front();
q.pop();
for (auto& edge_idx : adj[x]) {
// the edge that goes u -> v where v == x
const auto& e = reverse_edge(edges[edge_idx]);
if (!e.residual_capacity()) {
continue;
}
if (!seen[e.u]) {
seen[e.u] = true;
q.push(e.u);
}
}
}
return seen;
}
std::pair<std::vector<size_t>, std::vector<size_t>> partition(
size_t s,
size_t t) {
// Note: the partitioning returns "reachable" / "unreachable",
// but specifically, for "unreachable", it returns "all vertices
// that are reachable from t in the reverse residual graph"
// and for "reachable" it returns all other nodes. This mirrors
// the behavior of networkx.
auto can_reach_t = reverse_bfs_reachable(t);
std::vector<size_t> reachable, unreachable;
for (size_t i = 0; i < graph_size; ++i) {
if (can_reach_t[i]) {
unreachable.push_back(i);
} else {
reachable.push_back(i);
}
}
return std::pair<std::vector<size_t>, std::vector<size_t>>(
std::move(reachable), std::move(unreachable));
}
MinCutResult minimum_cut(const std::string& s, const std::string& t) {
if (mapping.find(s) == mapping.end() || mapping.find(t) == mapping.end()) {
return {
MinCutStatus::INVALID, // status
0, // max_flow
{}, // reachable
{}, // unreachable
};
}
auto s_int = mapping[s];
auto t_int = mapping[t];
auto [status, max_flow] = compute_max_flow(s_int, t_int);
if (status != MinCutStatus::SUCCESS) {
return {
status, // status
0, // max_flow
{}, // reachable
{}, // unreachable
};
}
auto [reachable_idxs, unreachable_idxs] = partition(s_int, t_int);
std::vector<std::string> reachable, unreachable;
auto idxs_to_names = [&](std::vector<size_t>& src,
std::vector<std::string>& dest) {
dest.reserve(src.size());
for (auto idx : src) {
dest.push_back(vertex_names[idx]);
}
};
idxs_to_names(reachable_idxs, reachable);
idxs_to_names(unreachable_idxs, unreachable);
return {
MinCutStatus::SUCCESS,
max_flow,
reachable,
unreachable,
};
}
};
} // namespace
MinCutStatus NetworkFlowGraph::add_edge(
const std::string& source,
const std::string& dest,
int64_t capacity) {
edges.push_back({source, dest, capacity});
return MinCutStatus::SUCCESS;
}
MinCutResult NetworkFlowGraph::minimum_cut(
const std::string& s,
const std::string& t) const {
auto flow_graph = DinicFlowGraph(*this);
return flow_graph.minimum_cut(s, t);
}
} // namespace c10

54
c10/util/NetworkFlow.h Normal file
View File

@ -0,0 +1,54 @@
#pragma once
#include <c10/macros/Macros.h>
#include <string>
#include <vector>
/**
* This file provides a network flow implementation.
* https://en.wikipedia.org/wiki/Flow_network
*
* It aims to mirror some of the behavior of networkx, which is/was used by
* functorch partitioners for splitting the graph into a forward and backward
* graph.
*/
namespace c10 {
enum class C10_API_ENUM MinCutStatus {
SUCCESS = 0,
UNBOUNDED = 1,
OVERFLOW_INF = 2,
INVALID = 3,
};
struct MinCutResult {
MinCutStatus status;
int64_t max_flow;
std::vector<std::string> reachable;
std::vector<std::string> unreachable;
};
// Modeled after networkx implementation
class C10_API NetworkFlowGraph {
public:
// selected such that INF + INF is < INT64_MAX
constexpr static int64_t INF = (1LL << 62) - 1;
struct Edge {
std::string source, dest;
int64_t capacity;
};
MinCutStatus add_edge(
const std::string& source,
const std::string& dest,
int64_t capacity = 1);
MinCutResult minimum_cut(const std::string& s, const std::string& t) const;
std::vector<Edge> edges;
};
} // namespace c10