Remove nomscheduler (#17693)

Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/17693

Remove nomscheduler tool

Reviewed By: yinghai

Differential Revision: D14328168

fbshipit-source-id: 674d0e18596a4dc2bbb6b8d321f4066c4fc454ab
This commit is contained in:
Duc Ngo
2019-03-06 10:31:00 -08:00
committed by Facebook Github Bot
parent 886e482776
commit e9eb18a18c
10 changed files with 0 additions and 1302 deletions

View File

@ -70,7 +70,6 @@ if(NOT BUILD_ATEN_ONLY)
add_subdirectory(utils)
add_subdirectory(predictor)
add_subdirectory(core/nomnigraph)
add_subdirectory(core/nomscheduler)
add_subdirectory(serialize)
if (USE_NVRTC)
add_subdirectory(cuda_rtc)

View File

@ -1,8 +0,0 @@
# ---[ CPU files.
file(GLOB_RECURSE NOMSCHEDULER_SRCS *.cc)
file(GLOB_RECURSE NOMSCHEDULER_TEST_SRCS *Test.cc)
exclude(NOMSCHEDULER_SRCS "${NOMSCHEDULER_SRCS}" "${NOMSCHEDULER_TEST_SRCS}")
install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/include
DESTINATION include
FILES_MATCHING PATTERN "*.h")

View File

@ -1,3 +0,0 @@
# nomscheduler
nomscheduler is a package built on top of nomnigraph that provides support for task scheduling algorithms

View File

@ -1,154 +0,0 @@
#include "CriticalPathAnalyzer.h"
#include <vector>
namespace {
// Data structure used by the dynamic programming algorithm.
struct PathTrace {
float cost;
bool computed = false;
// Successive (taskId, deviceId) on the critical path.
int depTaskId = -1;
int depDeviceId = -1;
};
// List of tasks and device assignment along a path.
struct FullPathTrace {
std::vector<int> taskIds;
std::vector<int> deviceIds;
};
struct State {
explicit State(const nomscheduler::SchedulerInput& input) {
int nTasks = input.getNumberOfTasks();
int nDevices = input.getNumberOfDevices();
pathTrace.resize(nTasks);
for (int taskId = 0; taskId < nTasks; taskId++) {
pathTrace.at(taskId).resize(nDevices);
}
}
// Use dynamic programming to compute the theorical critical path.
// pathTrace[T][D] = theoretical critical path starting from task T,
// given that task T is executed on device D.
// Dynamic programming can be used because the critical path has optimize
// substructure (on a DAG), and can be expressed recursively as follows.
// pathTrace[T][D] =
// argmax(T', compCost(T, D) +
// argmin(D', commCost(T, D, T', D') + pathTrace[T'][D'])
std::vector<std::vector<PathTrace>> pathTrace;
};
void computePathTrace(
const nomscheduler::SchedulerInput& input,
State& state,
int taskId,
int deviceId) {
auto& trace = state.pathTrace.at(taskId).at(deviceId);
if (trace.computed) {
return;
}
int nDevices = input.getNumberOfDevices();
float computationCost =
input.getTaskDeviceCostModel(taskId, deviceId).getComputationCost();
float maxCost = computationCost;
// Recursively compute (with memoization) critical path trace on all possible
// (depTaskId, depDeviceId) combinations where depTaskId is a dependent of
// taskId.
for (auto outEdge : input.getTaskNode(taskId)->getOutEdges()) {
auto depTaskId = outEdge->head()->data().getId();
float dataSize = outEdge->data().getDataSize();
int bestCaseDeviceId = -1;
float bestCaseCost = 0;
for (int depDeviceId = 0; depDeviceId < nDevices; depDeviceId++) {
float commCost = dataSize *
input.getDeviceEdge(deviceId, depDeviceId).getDataTransferRate();
// Make sure that path trace is computed on (depTaskId, depDeviceId)
computePathTrace(input, state, depTaskId, depDeviceId);
auto& depTrace = state.pathTrace.at(depTaskId).at(depDeviceId);
float totalCost = computationCost + commCost + depTrace.cost;
if (bestCaseDeviceId == -1 || totalCost < bestCaseCost) {
// Given depTaskId, choose the device assignment that minimizes the
// total computation cost.
bestCaseDeviceId = depDeviceId;
bestCaseCost = totalCost;
}
}
if (bestCaseCost > maxCost) {
maxCost = bestCaseCost;
trace.depTaskId = depTaskId;
trace.depDeviceId = bestCaseDeviceId;
}
}
trace.computed = true;
trace.cost = maxCost;
}
FullPathTrace
constructFullPathTrace(const State& state, int taskId, int deviceId) {
FullPathTrace output;
int t = taskId;
int d = deviceId;
while (t != -1) {
output.taskIds.emplace_back(t);
output.deviceIds.emplace_back(d);
auto& trace = state.pathTrace.at(t).at(d);
t = trace.depTaskId;
d = trace.depDeviceId;
}
return output;
}
} // namespace
namespace nomscheduler {
CriticalPathOutput CriticalPathAnalyzer::analyze(const SchedulerInput& input) {
CriticalPathOutput output;
auto state = State(input);
float maxCost = 0;
int nTasks = input.getNumberOfTasks();
int nDevices = input.getNumberOfDevices();
int criticalPathTaskId = -1;
int criticalPathDeviceId = -1;
for (int taskId = 0; taskId < nTasks; taskId++) {
float bestCaseDeviceId = -1;
float bestCaseCost = 0;
for (int deviceId = 0; deviceId < nDevices; deviceId++) {
computePathTrace(input, state, taskId, deviceId);
auto& trace = state.pathTrace.at(taskId).at(deviceId);
if (bestCaseDeviceId == -1 || trace.cost < bestCaseCost) {
bestCaseCost = trace.cost;
bestCaseDeviceId = deviceId;
}
}
if (bestCaseCost > maxCost) {
maxCost = bestCaseCost;
criticalPathTaskId = taskId;
criticalPathDeviceId = bestCaseDeviceId;
}
}
auto fullPathTrace =
constructFullPathTrace(state, criticalPathTaskId, criticalPathDeviceId);
output.setOutput(maxCost, fullPathTrace.taskIds, fullPathTrace.deviceIds);
return output;
}
} // namespace nomscheduler

View File

@ -1,68 +0,0 @@
//===----------------------------------------------------------------------===//
//
// Tool to analyze (theoretical) critical path of a task scheduling problems.
//
//===----------------------------------------------------------------------===//
#ifndef NOM_SCHEDULER_CRITICAL_PATH_ANALYZER_H
#define NOM_SCHEDULER_CRITICAL_PATH_ANALYZER_H
#include <vector>
#include "Scheduler.h"
namespace nomscheduler {
class CriticalPathOutput {
public:
float getTotalCost() const {
return totalCost_;
}
std::vector<int> getTaskIds() const {
return taskIds_;
}
std::vector<int> getDeviceIds() const {
return deviceIds_;
}
void setOutput(
float totalCost,
const std::vector<int>& taskIds,
const std::vector<int>& deviceIds) {
totalCost_ = totalCost;
taskIds_ = taskIds;
deviceIds_ = deviceIds;
}
private:
float totalCost_;
// Task along the critical path.
std::vector<int> taskIds_;
// Device assignment of the tasks along the critical path.
std::vector<int> deviceIds_;
};
class CriticalPathAnalyzer {
public:
// Analyze the theoretical critical path(s) of an input scheduling problem.
// Communication cost should be taken into account.
// Formal definition
// The best-scenario computation cost of a path
// Task1 -> ... -> TaskK
// in the task dependency graph
// is defined by chossing a device assignment
// (Device1, ... , DeviceK)
// that minimizes the total computation cost (communication cost is taken
// into account).
//
// The critical path is defined as the path that has the maximum
// best-scenario computation cost.
CriticalPathOutput analyze(const SchedulerInput& input);
};
} // namespace nomscheduler
#endif // NOM_SCHEDULER_CRITICAL_PATH_ANALYZER_H

View File

@ -1,76 +0,0 @@
#ifndef NOM_SCHEDULER_HEFT_SCHEDULER_INTERNAL_H
#define NOM_SCHEDULER_HEFT_SCHEDULER_INTERNAL_H
#include "Scheduler.h"
namespace heftscheduler {
// Internal state associated with a task while the algorithm is running.
struct TaskState {
// Average computation cost across all devices.
float avgComputationCost;
// The upward rank of a task T is defined recursively as:
// upwardRank(T) =
// avgComputationCost(T) + max(avgCommCost(T, T') + upwardRank(T'))
// Basically, upwardRank(T) is the length of the critical path from
// task T to the exit task, including the computation cost of task T.
float upwardRank;
bool upwardRankComputed = false;
};
// Represents a slot of time to schedule tasks on a device.
// Additionally store the number of used cores, for intra-op parallelism.
struct CoreSlot {
float startTime, endTime;
int usedCores;
};
// Internal state associated with a device while the algorithm is running.
struct DeviceState {
// Maintain a list of slots per device.
// The slots are guaranteed to be continuous.
std::vector<CoreSlot> slots;
};
// Internal state of the HEFT scheduling algorithm.
struct AlgorithmState {
explicit AlgorithmState(const nomscheduler::SchedulerInput& input) {
int nTasks = input.getNumberOfTasks();
tasksState.resize(nTasks);
// Initial, unsorted values.
taskIdsByUpwardRank.resize(nTasks);
for (int taskId = 0; taskId < nTasks; taskId++) {
taskIdsByUpwardRank[taskId] = taskId;
}
int nDevices = input.getNumberOfDevices();
devicesState.resize(nDevices);
for (int deviceId = 0; deviceId < nDevices; deviceId++) {
CoreSlot all;
all.startTime = 0;
all.endTime = std::numeric_limits<float>::infinity();
all.usedCores = 0;
// Initially, there is no task scheduled on each device, so we just make
// one slot that covers the entire time horizon.
devicesState.at(deviceId).slots.emplace_back(all);
}
}
std::vector<TaskState> tasksState;
std::vector<DeviceState> devicesState;
// Task ids sorted by upward ranks in decreasing order.
// It can be shown that this is also a topological order.
std::vector<int> taskIdsByUpwardRank;
// Average data transfer rate between two devices.
float avgDataTransferRate;
};
} // namespace heftscheduler
#endif // NOM_SCHEDULER_HEFT_SCHEDULER_INTERNAL_H

View File

@ -1,317 +0,0 @@
#include "HEFTScheduler.h"
#include <vector>
namespace {
void computeAverageComputationCost(
const nomscheduler::SchedulerInput& input,
heftscheduler::AlgorithmState& state,
int taskId) {
float sum = 0;
for (int deviceId = 0; deviceId < input.getNumberOfDevices(); deviceId++) {
sum += input.getTaskDeviceCostModel(taskId, deviceId).getComputationCost();
}
state.tasksState.at(taskId).avgComputationCost =
sum / input.getNumberOfDevices();
}
void computeAverageComputationCost(
const nomscheduler::SchedulerInput& input,
heftscheduler::AlgorithmState& state) {
for (int taskId = 0; taskId < input.getNumberOfTasks(); taskId++) {
computeAverageComputationCost(input, state, taskId);
}
}
void computeAverageDataTransferRate(
const nomscheduler::SchedulerInput& input,
heftscheduler::AlgorithmState& state) {
state.avgDataTransferRate = 0;
int nDevices = input.getNumberOfDevices();
for (int deviceId1 = 0; deviceId1 < nDevices; deviceId1++) {
for (int deviceId2 = 0; deviceId2 < nDevices; deviceId2++) {
if (deviceId1 != deviceId2) {
state.avgDataTransferRate +=
input.getDeviceEdge(deviceId1, deviceId2).getDataTransferRate();
}
}
}
if (nDevices > 1) {
state.avgDataTransferRate /= nDevices * (nDevices - 1);
}
}
void computeUpwardRank(
const nomscheduler::SchedulerInput& input,
heftscheduler::AlgorithmState& state,
int taskId) {
auto& taskState = state.tasksState.at(taskId);
if (taskState.upwardRankComputed) {
return;
}
float maxDependentCost = 0;
for (auto outEdge : input.getTaskNode(taskId)->getOutEdges()) {
auto dependentTask = outEdge->head();
auto dependentTaskId = dependentTask->data().getId();
computeUpwardRank(input, state, dependentTaskId);
float avgCommCost =
input.getTaskEdge(taskId, dependentTaskId).getDataSize() *
state.avgDataTransferRate;
maxDependentCost = std::max(
maxDependentCost,
avgCommCost + state.tasksState.at(dependentTaskId).upwardRank);
}
taskState.upwardRankComputed = true;
taskState.upwardRank = taskState.avgComputationCost + maxDependentCost;
}
void computeUpwardRank(
const nomscheduler::SchedulerInput& input,
heftscheduler::AlgorithmState& state) {
for (int taskId = 0; taskId < input.getNumberOfTasks(); taskId++) {
computeUpwardRank(input, state, taskId);
}
}
void sortTasksByUpwardRank(heftscheduler::AlgorithmState& state) {
std::sort(
state.taskIdsByUpwardRank.begin(),
state.taskIdsByUpwardRank.end(),
[&state](int taskId1, int taskId2) -> bool {
return state.tasksState[taskId1].upwardRank >
state.tasksState[taskId2].upwardRank;
});
}
// Task assignment information on a specific device.
struct TaskAssignment {
bool possible;
float start, end;
};
// Compute the earliest possible start time of a task on a device based
// on dependency graph information and current schedule.
float computeEarliestPossibleStartTimeFromDAG(
const nomscheduler::SchedulerInput& input,
const heftscheduler::AlgorithmState& state,
const nomscheduler::SchedulerOutput& output,
int taskId,
int deviceId) {
float result = 0.0f;
for (auto& inEdge : input.getTaskNode(taskId)->getInEdges()) {
auto prereqTaskId = inEdge->tail()->data().getId();
auto& prereqScheduleItem = output.getTaskScheduleItem(prereqTaskId);
// Since the algorithm schedule tasks in topological order, at this point
// all the prerequisites should have been scheduled.
assert(prereqScheduleItem.isAssigned());
// Communication time to send output from the prerequisite task to the
// current task.
float commTime = inEdge->data().getDataSize() *
input.getDeviceEdge(prereqScheduleItem.getAssignedDeviceId(), deviceId)
.getDataTransferRate();
result = std::max(result, prereqScheduleItem.getEndTime() + commTime);
}
return result;
}
void computeEarliestTaskAssignment(
const nomscheduler::SchedulerInput& input,
const heftscheduler::AlgorithmState& state,
const nomscheduler::SchedulerOutput& output,
int taskId,
int deviceId,
TaskAssignment& assignment) {
assignment.possible = false;
auto& costModel = input.getTaskDeviceCostModel(taskId, deviceId);
if (!costModel.isPossible()) {
// If the task cannot be scheduled on the device.
return;
}
float earliestPossibleStartTime = computeEarliestPossibleStartTimeFromDAG(
input, state, output, taskId, deviceId);
int coresUsedByTask = input.getTask(taskId).getIntraDeviceParallelism();
int coresInDevice = input.getDevice(deviceId).getNumberOfCores();
auto& slots = state.devicesState.at(deviceId).slots;
for (int slotId = 0; slotId < slots.size(); slotId++) {
auto& slot = slots.at(slotId);
if (earliestPossibleStartTime > slot.endTime) {
// Ignore slots that end before the earliest possible start time.
continue;
}
float requiredStart = std::max(earliestPossibleStartTime, slot.startTime);
float requiredEnd = requiredStart + costModel.getComputationCost();
// Find a range of slots that can accommodate the task.
bool found = false;
for (int endSlotId = slotId; endSlotId < slots.size(); endSlotId++) {
auto& endSlot = slots[endSlotId];
if (endSlot.usedCores + coresUsedByTask > coresInDevice) {
// Not enough cores to execute the task.
break;
}
if (requiredEnd <= endSlot.endTime) {
// We found a range of slots that covers the time window to execute
// the task.
found = true;
break;
}
}
if (found) {
assignment.possible = true;
assignment.start = requiredStart;
assignment.end = requiredEnd;
break;
}
}
}
// Update the list of slots for a device, given the schedule of a new task
// on that device.
void updateSlots(
const nomscheduler::SchedulerInput& input,
heftscheduler::AlgorithmState& state,
int taskId,
int deviceId,
const TaskAssignment& assignment) {
auto& slots = state.devicesState.at(deviceId).slots;
// Find start slot of the slot ranges that cover the task assignment time
// window.
int startSlotId = 0;
while (startSlotId < slots.size() &&
assignment.start >= slots.at(startSlotId).endTime) {
startSlotId++;
}
// Start slot must exist.
assert(
startSlotId < slots.size() &&
assignment.start < slots.at(startSlotId).endTime);
if (assignment.start > slots.at(startSlotId).startTime) {
auto& startSlot = slots.at(startSlotId);
// Split the start slot into two.
heftscheduler::CoreSlot newSlot;
newSlot.startTime = startSlot.startTime;
newSlot.endTime = assignment.start;
newSlot.usedCores = startSlot.usedCores;
startSlot.startTime = assignment.start;
slots.insert(slots.begin() + startSlotId, newSlot);
startSlotId++;
}
// Find end slot of the slot ranges that cover the task assignment time
// window.
int endSlotId = startSlotId;
while (endSlotId < slots.size() &&
assignment.end > slots.at(endSlotId).endTime) {
endSlotId++;
}
// End slot must exist.
assert(
endSlotId < slots.size() &&
assignment.end <= slots.at(endSlotId).endTime);
if (assignment.end < slots.at(endSlotId).endTime) {
// Split the end slot into two.
auto& endSlot = slots.at(endSlotId);
heftscheduler::CoreSlot newSlot;
newSlot.startTime = endSlot.startTime;
newSlot.endTime = assignment.end;
newSlot.usedCores = endSlot.usedCores;
endSlot.startTime = assignment.end;
slots.insert(slots.begin() + endSlotId, newSlot);
}
// Now we just update the usedCores count of the slots.
int coresUsedByTask = input.getTask(taskId).getIntraDeviceParallelism();
for (int slotId = startSlotId; slotId <= endSlotId; slotId++) {
slots.at(slotId).usedCores += coresUsedByTask;
}
}
void scheduleTask(
const nomscheduler::SchedulerInput& input,
heftscheduler::AlgorithmState& state,
nomscheduler::SchedulerOutput& output,
int taskId) {
// For each device, calculate the earliest possible assignment of the task
// to the device (or if the task can even be assigned to the device at all).
std::vector<TaskAssignment> assignments(input.getNumberOfDevices());
for (int deviceId = 0; deviceId < input.getNumberOfDevices(); deviceId++) {
computeEarliestTaskAssignment(
input, state, output, taskId, deviceId, assignments.at(deviceId));
}
// Select the device that minimize the earlist finish time of the task.
int scheduledDeviceId = -1;
for (int deviceId = 0; deviceId < input.getNumberOfDevices(); deviceId++) {
auto& assignment = assignments.at(deviceId);
if (assignment.possible &&
((scheduledDeviceId == -1 ||
assignment.end < assignments.at(scheduledDeviceId).end))) {
scheduledDeviceId = deviceId;
}
}
if (scheduledDeviceId == -1) {
// No device can execute the task.
output.setFailure(true);
} else {
auto& assignment = assignments.at(scheduledDeviceId);
auto& taskScheduleItem = output.getMutableTaskScheduleItem(taskId);
taskScheduleItem.setAssignedDeviceId(scheduledDeviceId);
taskScheduleItem.setStartTime(assignment.start);
taskScheduleItem.setEndTime(assignment.end);
updateSlots(input, state, taskId, scheduledDeviceId, assignment);
}
}
void scheduleTasks(
const nomscheduler::SchedulerInput& input,
heftscheduler::AlgorithmState& state,
nomscheduler::SchedulerOutput& output) {
// Loop over tasks in decreasing order of upward ranks (which is also
// a topological order) and schedule each one.
for (int taskId : state.taskIdsByUpwardRank) {
scheduleTask(input, state, output, taskId);
if (output.isFailure()) {
break;
}
}
}
} // namespace
namespace nomscheduler {
std::pair<SchedulerOutput, heftscheduler::AlgorithmState>
HEFTScheduler::scheduleInternal(const SchedulerInput& input) {
heftscheduler::AlgorithmState state(input);
computeAverageComputationCost(input, state);
computeAverageDataTransferRate(input, state);
computeUpwardRank(input, state);
sortTasksByUpwardRank(state);
SchedulerOutput output(input.getNumberOfTasks());
output.setFailure(false);
scheduleTasks(input, state, output);
return std::make_pair(output, state);
}
SchedulerOutput HEFTScheduler::schedule(const SchedulerInput& input) {
return scheduleInternal(input).first;
}
} // namespace nomscheduler

View File

@ -1,35 +0,0 @@
//===----------------------------------------------------------------------===//
//
// nomnigraph supports for task scheduling problems.
// HEFT-based implementation of scheduler.
//
// (Heterogeneous Earliest Finish Time)
// Original description:
// Performance-effective and low-complexity task scheduling
// for heterogeneous computing
// H. Topcuoglu; S. Hariri; Min-You Wu
// IEEE Transactions on Parallel and Distributed Systems 2002
//
//===----------------------------------------------------------------------===//
#ifndef NOM_SCHEDULER_HEFT_SCHEDULER_H
#define NOM_SCHEDULER_HEFT_SCHEDULER_H
#include "HEFTScheduler-Internal.h"
#include "Scheduler.h"
namespace nomscheduler {
class HEFTScheduler : Scheduler {
public:
virtual SchedulerOutput schedule(const SchedulerInput& input) override;
// Expose a scheduling method that also returns an internal algorithm state
// for unit testing purpose.
std::pair<SchedulerOutput, heftscheduler::AlgorithmState> scheduleInternal(
const SchedulerInput& input);
};
} // namespace nomscheduler
#endif // NOM_SCHEDULER_HEFT_SCHEDULER_H

View File

@ -1,350 +0,0 @@
//===----------------------------------------------------------------------===//
//
// nomnigraph supports for task scheduling problems.
//
//===----------------------------------------------------------------------===//
#ifndef NOM_SCHEDULER_SCHEDULER_H
#define NOM_SCHEDULER_SCHEDULER_H
#include "caffe2/core/common.h"
#include "nomnigraph/Graph/Graph.h"
#include <algorithm>
#include <vector>
namespace nomscheduler {
// Models a processing unit (such as CPU/GPU/accelerator/...) that can execute a
// task.
class Device {
public:
Device() {}
int getNumberOfCores() const {
return numberOfCores_;
}
void setNumberOfCores(int numberOfCores) {
numberOfCores_ = numberOfCores;
}
// unit: GB
float getMaxMemory() const {
return maxMemory_;
}
void setMaxMemory(float maxMemory) {
maxMemory_ = maxMemory;
}
private:
int numberOfCores_;
float maxMemory_;
};
// Models a link between two devices.
class DeviceEdge {
public:
DeviceEdge() {}
// data transfer rate between two devices (unit: s / bytes)
float getDataTransferRate() const {
return dataTransferRate_;
}
void setDataTransferRate(float dataTransferRate) {
dataTransferRate_ = dataTransferRate;
}
private:
float dataTransferRate_;
};
// Models a unit of work that can be scheduled to run on a device.
class Task {
public:
Task(int taskId) : taskId_(taskId) {}
// number of cores that will be used by the task
int getIntraDeviceParallelism() const {
return intraDeviceParallelism_;
}
void setIntraDeviceParallelism(int intraDeviceParallelism) {
intraDeviceParallelism_ = intraDeviceParallelism;
}
// static memory consumed by the task, unit: GB
float getStaticMemoryConsumed() const {
return staticMemoryConsumed_;
}
void setStaticMemoryConsumed(float staticMemoryConsumed) {
staticMemoryConsumed_ = staticMemoryConsumed;
}
int getId() const {
return taskId_;
}
private:
int intraDeviceParallelism_;
float staticMemoryConsumed_;
int taskId_;
};
// Model a dependency between two tasks. An edge between Task A -> Task B
// means that Task B depends on the output of Task A, and so Task B must start
// after task A finishes.
// The edge A->B also holds the size of the data that needs to be transferred
// from task A to task B, i.e. the total size of the blobs produced by A
// and consumed by B.
class TaskEdge {
public:
// size of data transfered between two tasks (unit : bytes)
float getDataSize() const {
return dataSize_;
}
void setDataSize(float dataSize) {
dataSize_ = dataSize;
}
private:
float dataSize_;
};
// Represents the cost model of a task executed on a specific device.
class TaskDeviceEdge {
public:
// estimated computation cost for a task executed by a device
// (runtime, unit: ms)
float getComputationCost() const {
return computationCost_;
}
void setComputationCost(float computationCost) {
computationCost_ = computationCost;
}
// Return true if the task can be executed by the device.
bool isPossible() const {
return possible_;
}
void setPossible(bool possible) {
possible_ = possible;
}
private:
float computationCost_;
bool possible_ = true;
};
// dependency DAG between tasks
using TaskGraph = nom::Graph<Task, TaskEdge>;
// (undirected) graph between devices, to represent communication links
// between devices
using DeviceGraph = nom::Graph<Device, DeviceEdge>;
// (bipartite) task - device graph, represents estimated cost model for
// task execution on each device
// We don't currently store data on this graph's node, so int type is just a
// placeholder.
using TaskDeviceCostModelGraph = nom::Graph<int /*unused*/, TaskDeviceEdge>;
// Input to the scheduler. Underneath, the input is represented by one
// TaskGraph, one DeviceGraph and one TaskDeviceCostModelGraph.
class SchedulerInput {
public:
SchedulerInput(int numTasks, int numDevices) {
for (int taskId = 0; taskId < numTasks; taskId++) {
tasks_.emplace_back(taskGraph_.createNode(taskId));
taskNodes_.emplace_back(costModelGraph_.createNode());
}
for (int deviceId = 0; deviceId < numDevices; deviceId++) {
devices_.emplace_back(deviceGraph_.createNode());
deviceNodes_.emplace_back(costModelGraph_.createNode());
}
for (int taskId = 0; taskId < numTasks; taskId++) {
for (int deviceId = 0; deviceId < numDevices; deviceId++) {
costModelGraph_.createEdge(
taskNodes_[taskId], deviceNodes_[deviceId], TaskDeviceEdge());
}
}
for (int deviceId1 = 0; deviceId1 < numDevices; deviceId1++) {
for (int deviceId2 = 0; deviceId2 < numDevices; deviceId2++) {
deviceGraph_.createEdge(
devices_[deviceId1], devices_[deviceId2], DeviceEdge());
}
}
}
int getNumberOfDevices() const {
return deviceGraph_.getNodesCount();
}
int getNumberOfTasks() const {
return taskGraph_.getNodesCount();
}
Device* getMutableDevice(int deviceId) {
return devices_[deviceId]->mutableData();
}
Task* getMutableTask(int taskId) {
return tasks_[taskId]->mutableData();
}
const Device& getDevice(int deviceId) const {
return devices_[deviceId]->data();
}
const Task& getTask(int taskId) const {
return tasks_[taskId]->data();
}
void createTaskDependency(int taskId1, int taskId2) {
taskGraph_.createEdge(tasks_[taskId1], tasks_[taskId2], TaskEdge());
}
TaskDeviceEdge* getMutableTaskDeviceCostModel(int taskId, int deviceId) {
return costModelGraph_.getEdge(taskNodes_[taskId], deviceNodes_[deviceId])
->mutableData();
}
const TaskDeviceEdge& getTaskDeviceCostModel(int taskId, int deviceId) const {
return costModelGraph_.getEdge(taskNodes_[taskId], deviceNodes_[deviceId])
->data();
}
DeviceEdge* getMutableDeviceEdge(int deviceId1, int deviceId2) {
return deviceGraph_.getEdge(devices_[deviceId1], devices_[deviceId2])
->mutableData();
}
const DeviceEdge& getDeviceEdge(int deviceId1, int deviceId2) const {
return deviceGraph_.getEdge(devices_[deviceId1], devices_[deviceId2])
->data();
}
TaskEdge* getMutableTaskEdge(int taskId1, int taskId2) {
return taskGraph_.getEdge(tasks_[taskId1], tasks_[taskId2])->mutableData();
}
const TaskEdge& getTaskEdge(int taskId1, int taskId2) const {
return taskGraph_.getEdge(tasks_[taskId1], tasks_[taskId2])->data();
}
TaskGraph::NodeRef getTaskNode(int taskId) const {
return tasks_[taskId];
}
private:
TaskGraph taskGraph_;
DeviceGraph deviceGraph_;
TaskDeviceCostModelGraph costModelGraph_;
std::vector<TaskGraph::NodeRef> tasks_;
std::vector<DeviceGraph::NodeRef> devices_;
std::vector<TaskDeviceCostModelGraph::NodeRef> taskNodes_;
std::vector<TaskDeviceCostModelGraph::NodeRef> deviceNodes_;
};
// Represents a schedule item for a task. Consists of the device that the task
// should be assigned to, and the (estimated) start and end time of the task
// execution based on the cost model given to the scheduler.
class TaskScheduleItem {
public:
int getAssignedDeviceId() const {
return assignedDeviceId_;
}
bool isAssigned() const {
return assignedDeviceId_ != -1;
}
void setAssignedDeviceId(int assignedDeviceId) {
assignedDeviceId_ = assignedDeviceId;
}
float getStartTime() const {
return startTime_;
}
void setStartTime(float startTime) {
startTime_ = startTime;
}
float getEndTime() const {
return endTime_;
}
void setEndTime(float endTime) {
endTime_ = endTime;
}
private:
int assignedDeviceId_ = -1;
float startTime_;
float endTime_;
};
// Represents an output of the static scheduler - a map from each task
// to a TaskScheduleItem for that task.
class SchedulerOutput {
public:
SchedulerOutput(int numTasks) {
for (int i = 0; i < numTasks; i++) {
taskScheduleItems_.emplace_back();
}
}
TaskScheduleItem& getMutableTaskScheduleItem(int taskId) {
return taskScheduleItems_[taskId];
}
const TaskScheduleItem& getTaskScheduleItem(int taskId) const {
return taskScheduleItems_[taskId];
}
// The finish time of the schedule, which is just the maximum end time
// of all the schedule items.
float getFinishTime() const {
float result = 0;
for (auto& scheduleItem : taskScheduleItems_) {
result = std::max(result, scheduleItem.getEndTime());
}
return result;
}
// Fails to compute a schedule.
bool isFailure() const {
return failure_;
}
void setFailure(bool failure) {
failure_ = failure;
}
private:
std::vector<TaskScheduleItem> taskScheduleItems_;
bool failure_;
};
// Interface for static schedulers.
class Scheduler {
public:
virtual ~Scheduler() {}
virtual SchedulerOutput schedule(const SchedulerInput&) = 0;
};
} // namespace nomscheduler
#endif // NOM_SCHEDULER_SCHEDULER_H

View File

@ -1,290 +0,0 @@
#include <algorithm>
#include <cstdio>
#include <limits>
#include <sstream>
#include <unordered_set>
#include "nomscheduler/Scheduler/CriticalPathAnalyzer.h"
#include "nomscheduler/Scheduler/HEFTScheduler.h"
#include "nomscheduler/Scheduler/Scheduler.h"
#include <gtest/gtest.h>
namespace nomscheduler {
SchedulerInput loadSchedulerInputFromString(const std::string& fileInput) {
std::stringstream ss;
ss << fileInput;
int numTasks, numDevices;
ss >> numTasks >> numDevices;
SchedulerInput result(numTasks, numDevices);
// Cores per devices
for (int id = 0; id < numDevices; id++) {
int numCores;
ss >> numCores;
result.getMutableDevice(id)->setNumberOfCores(numCores);
}
// Parallelism per task
for (int id = 0; id < numTasks; id++) {
int parallelismLevel;
ss >> parallelismLevel;
result.getMutableTask(id)->setIntraDeviceParallelism(parallelismLevel);
}
// The computation costs of each task
for (int taskId = 0; taskId < numTasks; taskId++) {
for (int deviceId = 0; deviceId < numDevices; deviceId++) {
float cost;
ss >> cost;
if (cost < 0) {
result.getMutableTaskDeviceCostModel(taskId, deviceId)
->setPossible(false);
} else {
result.getMutableTaskDeviceCostModel(taskId, deviceId)
->setComputationCost(cost);
}
}
}
for (int deviceId1 = 0; deviceId1 < numDevices; deviceId1++) {
for (int deviceId2 = 0; deviceId2 < numDevices; deviceId2++) {
float rate;
ss >> rate;
result.getMutableDeviceEdge(deviceId1, deviceId2)
->setDataTransferRate(rate);
}
}
for (int taskId1 = 0; taskId1 < numTasks; taskId1++) {
for (int taskId2 = 0; taskId2 < numTasks; taskId2++) {
float dataSize;
ss >> dataSize;
if (dataSize > 0) {
result.createTaskDependency(taskId1, taskId2);
result.getMutableTaskEdge(taskId1, taskId2)->setDataSize(dataSize);
}
}
}
for (int deviceId = 0; deviceId < numDevices; deviceId++) {
float maxMemory;
ss >> maxMemory;
result.getMutableDevice(deviceId)->setMaxMemory(maxMemory);
}
for (int id = 0; id < numTasks; id++) {
float staticMemoryConsumed;
ss >> staticMemoryConsumed;
result.getMutableTask(id)->setStaticMemoryConsumed(staticMemoryConsumed);
}
return result;
}
// A simple scheduling algorithm, just for testing and comparison purpose.
// For each iteration:
// - Pick any task that is ready to schedule (no dependency)
// - Then pick a device that has the earliest next available time to
// schedule that task.
// For simplicity, this algorithm does not take into account any resource
// constraints.
class SimpleScheduler : Scheduler {
public:
SchedulerOutput schedule(const SchedulerInput& input) override {
int numTasks = input.getNumberOfTasks();
SchedulerOutput result(numTasks);
std::unordered_set<TaskGraph::NodeRef> scheduledTasks;
// Next available time per device.
std::vector<float> nextFreeTime;
for (int i = 0; i < input.getNumberOfDevices(); i++) {
nextFreeTime.emplace_back(0);
}
while (scheduledTasks.size() < numTasks) {
for (int taskId = 0; taskId < numTasks; taskId++) {
auto taskNode = input.getTaskNode(taskId);
if (scheduledTasks.count(taskNode)) {
continue;
}
bool hasDependency = false;
for (auto& inEdge : taskNode->getInEdges()) {
auto tail = inEdge->tail();
if (!scheduledTasks.count(tail)) {
hasDependency = true;
break;
}
}
if (!hasDependency) {
scheduledTasks.insert(taskNode);
// Find the device with earliest next available time.
int earliestDeviceId = 0;
for (int deviceId = 1; deviceId < input.getNumberOfDevices();
deviceId++) {
if (nextFreeTime[deviceId] < nextFreeTime[earliestDeviceId]) {
earliestDeviceId = deviceId;
}
}
// Schedule the task on the device.
auto& taskScheduleItem = result.getMutableTaskScheduleItem(taskId);
taskScheduleItem.setAssignedDeviceId(earliestDeviceId);
taskScheduleItem.setStartTime(nextFreeTime[earliestDeviceId]);
auto computationCost =
input.getTaskDeviceCostModel(taskId, earliestDeviceId)
.getComputationCost();
taskScheduleItem.setEndTime(
taskScheduleItem.getStartTime() + computationCost);
// Update next available time for the device.
nextFreeTime[earliestDeviceId] = taskScheduleItem.getEndTime();
break;
}
}
}
return result;
}
};
} // namespace nomscheduler
nomscheduler::SchedulerInput getTestInput() {
return nomscheduler::loadSchedulerInputFromString(R"(
10 3
1 1 1
1 1 1 1 1 1 1 1 1 1
14 16 9
13 19 18
11 13 19
13 8 17
12 13 10
13 16 9
7 15 11
5 11 14
18 12 20
21 7 16
0 1 1
1 0 1
1 1 0
-1 18 12 9 11 14 -1 -1 -1 -1
-1 -1 -1 -1 -1 -1 -1 19 16 -1
-1 -1 -1 -1 -1 -1 23 -1 -1 -1
-1 -1 -1 -1 -1 -1 -1 27 23 -1
-1 -1 -1 -1 -1 -1 -1 -1 13 -1
-1 -1 -1 -1 -1 -1 -1 15 -1 -1
-1 -1 -1 -1 -1 -1 -1 -1 -1 17
-1 -1 -1 -1 -1 -1 -1 -1 -1 11
-1 -1 -1 -1 -1 -1 -1 -1 -1 13
-1 -1 -1 -1 -1 -1 -1 -1 -1 -1
256 256 256
12 1 1 18 12 1 12 1 10 6
)");
}
TEST(Scheduler, SchedulerTest) {
auto input = getTestInput();
EXPECT_EQ(input.getNumberOfTasks(), 10);
EXPECT_EQ(input.getNumberOfDevices(), 3);
EXPECT_EQ(input.getDevice(1).getNumberOfCores(), 1);
EXPECT_EQ(input.getTask(6).getIntraDeviceParallelism(), 1);
EXPECT_EQ(input.getTaskDeviceCostModel(0, 0).getComputationCost(), 14);
EXPECT_EQ(input.getTaskDeviceCostModel(8, 1).getComputationCost(), 12);
EXPECT_EQ(input.getDeviceEdge(0, 2).getDataTransferRate(), 1.0f);
EXPECT_EQ(input.getTaskEdge(0, 3).getDataSize(), 9);
EXPECT_EQ(input.getDevice(2).getMaxMemory(), 256);
EXPECT_EQ(input.getTask(3).getStaticMemoryConsumed(), 18);
auto scheduler = nomscheduler::SimpleScheduler();
auto output = scheduler.schedule(input);
EXPECT_EQ(output.getFinishTime(), 55);
}
TEST(Scheduler, HEFTSchedulerTest) {
auto error = 1E-3;
auto input = getTestInput();
auto scheduler = nomscheduler::HEFTScheduler();
auto outputAndState = scheduler.scheduleInternal(input);
auto state = outputAndState.second;
EXPECT_NEAR(state.avgDataTransferRate, 1.0f, error);
auto task9 = state.tasksState.at(9);
EXPECT_NEAR(task9.avgComputationCost, 14.6666f, error);
// This task has no dependency
EXPECT_NEAR(task9.upwardRank, task9.avgComputationCost, error);
auto task8 = state.tasksState.at(8);
EXPECT_NEAR(task8.avgComputationCost, 16.6666f, error);
EXPECT_NEAR(
task8.upwardRank,
task8.avgComputationCost +
input.getTaskEdge(8, 9).getDataSize() / state.avgDataTransferRate +
state.tasksState.at(9).upwardRank,
error);
EXPECT_NEAR(task8.upwardRank, 44.333f, error);
auto task0 = state.tasksState.at(0);
EXPECT_NEAR(task0.avgComputationCost, 13.0f, error);
EXPECT_NEAR(
task0.upwardRank,
task0.avgComputationCost +
input.getTaskEdge(0, 1).getDataSize() / state.avgDataTransferRate +
state.tasksState.at(1).upwardRank,
error);
EXPECT_NEAR(task0.upwardRank, 108.0f, error);
auto sortedTaskIds = std::vector<int>{0, 2, 3, 1, 4, 5, 8, 6, 7, 9};
EXPECT_EQ(state.taskIdsByUpwardRank, sortedTaskIds);
// Verify the output of the HEFT scheduler.
// The input and output in this unit test matches the example in the
// original HEFT paper.
auto output = outputAndState.first;
EXPECT_FALSE(output.isFailure());
EXPECT_NEAR(output.getFinishTime(), 80, error);
auto expectedAssignedDeviceId =
std::vector<int>{2, 0, 2, 1, 2, 1, 2, 0, 1, 1};
auto expectedStartTime =
std::vector<float>{0, 27, 9, 18, 28, 26, 38, 57, 56, 73};
auto assignedDeviceId = std::vector<int>();
auto scheduledStartTime = std::vector<float>();
for (int taskId = 0; taskId < input.getNumberOfTasks(); taskId++) {
auto& taskScheduleItem = output.getTaskScheduleItem(taskId);
assignedDeviceId.emplace_back(taskScheduleItem.getAssignedDeviceId());
scheduledStartTime.emplace_back(taskScheduleItem.getStartTime());
}
EXPECT_EQ(assignedDeviceId, expectedAssignedDeviceId);
EXPECT_EQ(scheduledStartTime, expectedStartTime);
}
TEST(Scheduler, CriticalPathAnalyzer) {
auto input = getTestInput();
auto analyzer = nomscheduler::CriticalPathAnalyzer();
auto output = analyzer.analyze(input);
EXPECT_EQ(output.getTotalCost(), 54.0f);
auto expectedTaskIds = std::vector<int>{0, 1, 8, 9};
auto expectedDeviceIds = std::vector<int>{1, 1, 1, 1};
EXPECT_EQ(output.getTaskIds(), expectedTaskIds);
EXPECT_EQ(output.getDeviceIds(), expectedDeviceIds);
}