mirror of
https://github.com/pytorch/pytorch.git
synced 2025-10-20 12:54:11 +08:00
[1/2] Intel GPU Runtime Upstreaming for Stream (#117611)
# Motivation As mentioned in [[RFC] Intel GPU Runtime Upstreaming](https://github.com/pytorch/pytorch/issues/114842), the second runtime component we would like to upstream is `Stream` which contains the device management functions of Intel GPU's runtime. To facilitate the code review, we split the code changes into 2 PRs. This is one of the 2 PRs and covers the changes under `c10`. # Design Intel GPU stream is a wrapper of sycl queue which schedules kernels on a sycl device. In our design, we will maintain a sycl queue pool containing 32 queues per priority per device. And when a queue is requested one of these queues is returned round-robin. The corresponding C++ files related to `Device` will be placed in `c10/xpu` folder. We provide the `c10::xpu::XPUStream` APIs, like - `XPUStream getStreamFromPool` - `XPUStream getCurrentXPUStream` - `void setCurrentXPUStream` - `void device_synchronize` # Additional Context In our plan, 2 PRs should be submitted to PyTorch for `Stream`: 1. for c10 2. for python frontend. The differences with CUDA: no default and external stream in XPU and lack of the below API: - `getDefaultCUDAStream` - `getStreamFromExternal` for cuda, `cuda::device_synchronize` can sync all streams on the device, but for xpu, `xpu::sync_streams_on_device` only sync all reserved streams on the device. Pull Request resolved: https://github.com/pytorch/pytorch/pull/117611 Approved by: https://github.com/EikanWang, https://github.com/jgong5, https://github.com/gujinghui, https://github.com/malfet
This commit is contained in:
committed by
PyTorch MergeBot
parent
7d516bbd5f
commit
0a41ac3cf3
@ -6,11 +6,14 @@ include(../../cmake/public/xpu.cmake)
|
||||
|
||||
set(C10_XPU_SRCS
|
||||
XPUFunctions.cpp
|
||||
XPUStream.cpp
|
||||
)
|
||||
set(C10_XPU_HEADERS
|
||||
XPUDeviceProp.h
|
||||
XPUException.h
|
||||
XPUFunctions.h
|
||||
XPUMacros.h
|
||||
XPUStream.h
|
||||
)
|
||||
|
||||
add_library(c10_xpu ${C10_XPU_SRCS} ${C10_XPU_HEADERS})
|
||||
|
22
c10/xpu/XPUException.h
Normal file
22
c10/xpu/XPUException.h
Normal file
@ -0,0 +1,22 @@
|
||||
#pragma once
|
||||
|
||||
#include <c10/util/Exception.h>
|
||||
#include <sycl/sycl.hpp>
|
||||
|
||||
namespace c10::xpu {
|
||||
|
||||
static inline sycl::async_handler asyncHandler = [](sycl::exception_list el) {
|
||||
if (el.size() == 0) {
|
||||
return;
|
||||
}
|
||||
for (const auto& e : el) {
|
||||
try {
|
||||
std::rethrow_exception(e);
|
||||
} catch (sycl::exception& e) {
|
||||
TORCH_WARN("SYCL Exception: ", e.what());
|
||||
}
|
||||
}
|
||||
throw;
|
||||
};
|
||||
|
||||
} // namespace c10::xpu
|
285
c10/xpu/XPUStream.cpp
Normal file
285
c10/xpu/XPUStream.cpp
Normal file
@ -0,0 +1,285 @@
|
||||
#include <c10/util/CallOnce.h>
|
||||
#include <c10/util/irange.h>
|
||||
#include <c10/xpu/XPUException.h>
|
||||
#include <c10/xpu/XPUStream.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <deque>
|
||||
#include <mutex>
|
||||
#include <vector>
|
||||
|
||||
namespace c10::xpu {
|
||||
namespace {
|
||||
|
||||
// Global stream state and constants
|
||||
c10::once_flag init_flag;
|
||||
DeviceIndex num_gpus = -1;
|
||||
constexpr int kStreamsPerPoolBits = 5;
|
||||
constexpr int kStreamsPerPool = 1 << kStreamsPerPoolBits;
|
||||
constexpr int kStreamTypeBits = 3;
|
||||
|
||||
// The SYCL queue pools are lazily initialized when the first queue is requested
|
||||
// for a device. The device flags track the initialization of each device. When
|
||||
// a queue is requested, the next queue in the pool to be returned in a
|
||||
// round-robin fashion, see Note [Stream Management].
|
||||
std::deque<c10::once_flag> device_flags;
|
||||
std::vector<std::array<
|
||||
std::array<std::unique_ptr<sycl::queue>, kStreamsPerPool>,
|
||||
max_compile_time_stream_priorities>>
|
||||
streams;
|
||||
std::deque<
|
||||
std::array<std::atomic<uint32_t>, max_compile_time_stream_priorities>>
|
||||
priority_counters;
|
||||
|
||||
thread_local std::unique_ptr<StreamId[]> current_streams = nullptr;
|
||||
|
||||
// Note [StreamId assignment]
|
||||
// ~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
// How do we assign stream IDs?
|
||||
//
|
||||
// -- 57 bits -- -- 5 bits ----- -- 3 bits --
|
||||
// zeros StreamIdIndex StreamIdType
|
||||
//
|
||||
// Where StreamIdType:
|
||||
// 000 = normal priority queue
|
||||
// 001 = high priority queue
|
||||
//
|
||||
// StreamId is 64-bit, so we can just rely on regular promotion rules.
|
||||
// We rely on StreamIdIndex and StreamIdType being non-negative;
|
||||
|
||||
using StreamIdIndex = uint8_t;
|
||||
enum class StreamIdType : uint8_t {
|
||||
// The higher the type number, the higher the priority.
|
||||
NORMAL = 0x0,
|
||||
HIGH = 0X1,
|
||||
};
|
||||
|
||||
inline std::ostream& operator<<(std::ostream& stream, StreamIdType q) {
|
||||
switch (q) {
|
||||
case StreamIdType::NORMAL:
|
||||
return stream << "NORMAL";
|
||||
case StreamIdType::HIGH:
|
||||
return stream << "HIGH";
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return stream << static_cast<int16_t>(q);
|
||||
}
|
||||
|
||||
inline StreamIdType streamIdType(StreamId s) {
|
||||
int mask_for_type = (1 << kStreamTypeBits) - 1;
|
||||
auto st = static_cast<StreamIdType>(s & mask_for_type);
|
||||
TORCH_CHECK(
|
||||
st == StreamIdType::NORMAL || st == StreamIdType::HIGH,
|
||||
"invalid StreamId: ",
|
||||
s);
|
||||
return st;
|
||||
}
|
||||
|
||||
inline StreamIdIndex streamIdIndex(StreamId s) {
|
||||
return static_cast<StreamIdIndex>(
|
||||
(s >> kStreamTypeBits) & ((1 << kStreamsPerPoolBits) - 1));
|
||||
}
|
||||
|
||||
inline StreamId makeStreamId(StreamIdType st, StreamIdIndex si) {
|
||||
return (static_cast<StreamId>(si) << kStreamTypeBits) |
|
||||
static_cast<StreamId>(st);
|
||||
}
|
||||
|
||||
void initGlobalStreamState() {
|
||||
num_gpus = c10::xpu::device_count();
|
||||
device_flags.resize(num_gpus);
|
||||
streams.resize(num_gpus);
|
||||
priority_counters.resize(num_gpus);
|
||||
}
|
||||
|
||||
// Creates the reserved SYCL queue pools for the specified device. It should be
|
||||
// call only once.
|
||||
void initDeviceStreamState(DeviceIndex device) {
|
||||
using namespace sycl::ext::oneapi::property;
|
||||
// Need to align with StreamIdType.
|
||||
const std::vector<sycl::property_list> properties = {
|
||||
{sycl::property::queue::in_order(), queue::priority_normal()},
|
||||
{sycl::property::queue::in_order(), queue::priority_high()}};
|
||||
for (const auto p : c10::irange(max_compile_time_stream_priorities)) {
|
||||
for (const auto i : c10::irange(kStreamsPerPool)) {
|
||||
streams[device][p][i] = std::make_unique<sycl::queue>(sycl::queue(
|
||||
c10::xpu::get_device_context(),
|
||||
c10::xpu::get_raw_device(device),
|
||||
c10::xpu::asyncHandler,
|
||||
properties[p]));
|
||||
}
|
||||
priority_counters[device][p] = 0;
|
||||
}
|
||||
}
|
||||
|
||||
void initXPUStreamsOnce() {
|
||||
c10::call_once(init_flag, initGlobalStreamState);
|
||||
|
||||
if (current_streams) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Inits current streams (thread local) to the last queue in the "normal
|
||||
// priority" queue pool. Note: the queue pool have not been initialized yet.
|
||||
// It will be initialized in initDeviceStreamState for the specified device.
|
||||
current_streams = std::make_unique<StreamId[]>(num_gpus);
|
||||
for (const auto i : c10::irange(num_gpus)) {
|
||||
// Assigning the current stream to the last one in the pool can be
|
||||
// beneficial in certain scenarios, particularly when users initialize their
|
||||
// workload to perform computations with the current stream (the last one)
|
||||
// and utilize stream (the first one) from the pool for communication, it
|
||||
// allows for different streams to overlap in computation and communication.
|
||||
current_streams[i] =
|
||||
makeStreamId(StreamIdType::NORMAL, kStreamsPerPool - 1);
|
||||
}
|
||||
}
|
||||
|
||||
// Creates the reserved sycl queue pools for the specified device to ensure
|
||||
// initialization only occurs once.
|
||||
inline void initDeviceStreamOnce(DeviceIndex device) {
|
||||
c10::call_once(device_flags[device], initDeviceStreamState, device);
|
||||
}
|
||||
|
||||
inline void check_device(DeviceIndex device) {
|
||||
TORCH_CHECK(
|
||||
device >= 0 && device < num_gpus,
|
||||
"device is out of range, device is ",
|
||||
static_cast<int16_t>(device),
|
||||
", total number of device is ",
|
||||
static_cast<int16_t>(num_gpus),
|
||||
".");
|
||||
}
|
||||
|
||||
uint32_t get_idx(std::atomic<uint32_t>& counter) {
|
||||
auto raw_idx = counter++;
|
||||
return raw_idx % kStreamsPerPool;
|
||||
}
|
||||
|
||||
XPUStream XPUStreamForId(DeviceIndex device_index, StreamId stream_id) {
|
||||
return XPUStream(
|
||||
XPUStream::UNCHECKED,
|
||||
Stream(
|
||||
Stream::UNSAFE,
|
||||
c10::Device(DeviceType::XPU, device_index),
|
||||
stream_id));
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
int XPUStream::priority() const {
|
||||
StreamId stream_id = stream_.id();
|
||||
StreamIdType st = streamIdType(stream_id);
|
||||
// StreamIdType and priority number are inversely related.
|
||||
return -static_cast<int>(st);
|
||||
}
|
||||
|
||||
// See Note [StreamId assignment]
|
||||
sycl::queue& XPUStream::queue() const {
|
||||
DeviceIndex device_index = stream_.device_index();
|
||||
StreamId stream_id = stream_.id();
|
||||
StreamIdType st = streamIdType(stream_id);
|
||||
StreamIdIndex si = streamIdIndex(stream_id);
|
||||
switch (st) {
|
||||
case StreamIdType::NORMAL:
|
||||
case StreamIdType::HIGH:
|
||||
return *streams[device_index][static_cast<uint8_t>(st)][si];
|
||||
default:
|
||||
TORCH_CHECK(
|
||||
false,
|
||||
"Unrecognized stream ",
|
||||
stream_,
|
||||
" (I didn't recognize the stream type, ",
|
||||
st,
|
||||
").",
|
||||
" Did you manufacture the StreamId yourself? Don't do that;");
|
||||
}
|
||||
}
|
||||
|
||||
// Returns a stream from the requested pool
|
||||
// Note: The stream pools will be initialized if needed, at the first invocation
|
||||
// to this function.
|
||||
XPUStream getStreamFromPool(const int priority, DeviceIndex device) {
|
||||
initXPUStreamsOnce();
|
||||
if (device == -1) {
|
||||
device = c10::xpu::current_device();
|
||||
}
|
||||
check_device(device);
|
||||
TORCH_CHECK(
|
||||
priority <= 0,
|
||||
"Expected XPU stream priority to be less than or equal to 0, got ",
|
||||
priority);
|
||||
// Initializes the stream pools (once)
|
||||
initDeviceStreamOnce(device);
|
||||
auto priority_idx =
|
||||
std::min(-priority, max_compile_time_stream_priorities - 1);
|
||||
const auto idx = get_idx(priority_counters[device][priority_idx]);
|
||||
auto id_type = static_cast<StreamIdType>(priority_idx);
|
||||
return XPUStreamForId(device, makeStreamId(id_type, idx));
|
||||
}
|
||||
|
||||
XPUStream getStreamFromPool(const bool isHighPriority, DeviceIndex device) {
|
||||
initXPUStreamsOnce();
|
||||
// If isHighPriority is true, return the stream with the highest priority.
|
||||
int priority = isHighPriority ? -max_compile_time_stream_priorities + 1 : 0;
|
||||
return getStreamFromPool(priority, device);
|
||||
}
|
||||
|
||||
// Note: The stream pools will be initialized if needed, at the first invocation
|
||||
// to this function.
|
||||
XPUStream getCurrentXPUStream(DeviceIndex device) {
|
||||
initXPUStreamsOnce();
|
||||
if (device == -1) {
|
||||
device = c10::xpu::current_device();
|
||||
}
|
||||
check_device(device);
|
||||
// Initializes the stream pool (once)
|
||||
initDeviceStreamOnce(device);
|
||||
return XPUStreamForId(device, current_streams[device]);
|
||||
}
|
||||
|
||||
// Note: The stream pools will be initialized if needed, at the first invocation
|
||||
// to this function.
|
||||
void setCurrentXPUStream(XPUStream stream) {
|
||||
initXPUStreamsOnce();
|
||||
current_streams[stream.device_index()] = stream.id();
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& stream, const XPUStream& s) {
|
||||
return stream << s.unwrap();
|
||||
}
|
||||
|
||||
/*
|
||||
* Note [Synchronize Streams on Device]
|
||||
*
|
||||
* There are two stream pools per device to manage our reserved SYCL queues.
|
||||
* When syncStreamsOnDevice is called, all reserved SYCL queues in the pools of
|
||||
* the specified device will be blocked, and wait for their synchronizations. We
|
||||
* realize the semantics via a loop through the stream pools of the specified
|
||||
* device and make each command queue synchronization sequentially.
|
||||
*
|
||||
* There is a semantic gap with device synchronization because only the SYCL
|
||||
* queues we have reserved (in our pools) will be synchronized, rather than
|
||||
* synchronizing all SYCL queues on the specified device.
|
||||
*/
|
||||
|
||||
// Note: The stream pools will be initialized if needed, at the first invocation
|
||||
// to this function.
|
||||
void syncStreamsOnDevice(DeviceIndex device) {
|
||||
initXPUStreamsOnce();
|
||||
if (device == -1) {
|
||||
device = c10::xpu::current_device();
|
||||
}
|
||||
check_device(device);
|
||||
// Initializes the stream pools (once)
|
||||
initDeviceStreamOnce(device);
|
||||
|
||||
// For each device, we have kStreamsPerPool (32) reserved queues per priority.
|
||||
for (const auto p : c10::irange(max_compile_time_stream_priorities)) {
|
||||
for (const auto i : c10::irange(kStreamsPerPool)) {
|
||||
streams[device][p][i]->wait();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace c10::xpu
|
162
c10/xpu/XPUStream.h
Normal file
162
c10/xpu/XPUStream.h
Normal file
@ -0,0 +1,162 @@
|
||||
#pragma once
|
||||
|
||||
#include <c10/core/Stream.h>
|
||||
#include <c10/xpu/XPUFunctions.h>
|
||||
|
||||
namespace c10::xpu {
|
||||
|
||||
/*
|
||||
* Note [Stream Management]
|
||||
*
|
||||
* An XPUStream is an abstraction of an actual SYCL queue in which SYCL kernel
|
||||
* can execute. Currently, there are several pools per device to manage SYCL
|
||||
* queue, and a device's pool is lazily created.
|
||||
*
|
||||
* There are two pools per device. The first pool contains "normal priority"
|
||||
* queues. The second pool is the "high priority" queues. There are 32 queues in
|
||||
* per pool per device, and when a queue is requested one of these queues is
|
||||
* returned round-robin. That is, the first queue requested is at index 0, the
|
||||
* second at index 1... to index 31, then index 0 again.
|
||||
*
|
||||
* This means that if 33 queues are requested, the first and last queues
|
||||
* requested are actually the same queue (under the covers) and kernels enqueued
|
||||
* on them cannot run concurrently.
|
||||
*
|
||||
* It is safe to enqueue a kernel on the same queue from two different
|
||||
* threads as the SYCL specification described.
|
||||
*/
|
||||
|
||||
static constexpr int max_compile_time_stream_priorities = 2;
|
||||
|
||||
/*
|
||||
* This serves as a wrapper around c10::Stream and acts as a representation for
|
||||
* a SYCL queue. On each device, a SYCL queue pool consists of kStreamsPerPool
|
||||
* queues, and you can access a particular queue by its index. The index is
|
||||
* extracted from XPUStream.id().
|
||||
*/
|
||||
class C10_XPU_API XPUStream {
|
||||
public:
|
||||
enum Unchecked { UNCHECKED };
|
||||
|
||||
// Construct a XPUStream from a Stream. This construction is checked, and
|
||||
// will raise an error if the Stream is not, in fact, a XPU stream.
|
||||
explicit XPUStream(Stream stream) : stream_(stream) {
|
||||
TORCH_CHECK(stream_.device_type() == DeviceType::XPU);
|
||||
}
|
||||
|
||||
// Construct a XPUStream from a Stream with no error checking.
|
||||
explicit XPUStream(Unchecked, Stream stream) : stream_(stream) {}
|
||||
|
||||
bool operator==(const XPUStream& other) const noexcept {
|
||||
return unwrap() == other.unwrap();
|
||||
}
|
||||
|
||||
bool operator!=(const XPUStream& other) const noexcept {
|
||||
return unwrap() != other.unwrap();
|
||||
}
|
||||
|
||||
operator sycl::queue&() const {
|
||||
return queue();
|
||||
}
|
||||
|
||||
operator Stream() const {
|
||||
return unwrap();
|
||||
}
|
||||
|
||||
DeviceType device_type() const {
|
||||
return DeviceType::XPU;
|
||||
}
|
||||
|
||||
DeviceIndex device_index() const {
|
||||
return stream_.device_index();
|
||||
}
|
||||
|
||||
Device device() const {
|
||||
return Device(DeviceType::XPU, device_index());
|
||||
}
|
||||
|
||||
// Return the stream ID corresponding to this particular stream. StreamId is
|
||||
/// a int64_t representation generated by its type and index.
|
||||
StreamId id() const {
|
||||
return stream_.id();
|
||||
}
|
||||
|
||||
bool query() const {
|
||||
return queue().ext_oneapi_empty();
|
||||
}
|
||||
|
||||
void synchronize() const {
|
||||
queue().wait_and_throw();
|
||||
}
|
||||
|
||||
int priority() const;
|
||||
|
||||
// Explicit conversion to sycl::queue&.
|
||||
sycl::queue& queue() const;
|
||||
|
||||
Stream unwrap() const {
|
||||
return stream_;
|
||||
}
|
||||
|
||||
struct c10::StreamData3 pack3() const {
|
||||
return stream_.pack3();
|
||||
}
|
||||
|
||||
static XPUStream unpack3(
|
||||
StreamId stream_id,
|
||||
DeviceIndex device_index,
|
||||
DeviceType device_type) {
|
||||
return XPUStream(Stream::unpack3(stream_id, device_index, device_type));
|
||||
}
|
||||
|
||||
static std::tuple<int, int> priority_range() {
|
||||
return std::make_tuple(0, -max_compile_time_stream_priorities + 1);
|
||||
}
|
||||
|
||||
private:
|
||||
Stream stream_;
|
||||
};
|
||||
|
||||
/**
|
||||
* Get a stream from the pool in a round-robin fashion.
|
||||
*
|
||||
* You can request a stream from the high priority pool by setting
|
||||
* isHighPriority to true, or a priority value for a specific device by setting
|
||||
* device.
|
||||
*/
|
||||
C10_XPU_API XPUStream
|
||||
getStreamFromPool(const bool isHighPriority = false, DeviceIndex device = -1);
|
||||
// The priority number lower, the priority higher.
|
||||
C10_XPU_API XPUStream
|
||||
getStreamFromPool(const int priority, DeviceIndex device = -1);
|
||||
|
||||
/**
|
||||
* Get the current XPU stream, for the passed XPU device, or for the current
|
||||
* device if no device index is passed.
|
||||
*/
|
||||
C10_XPU_API XPUStream getCurrentXPUStream(DeviceIndex device = -1);
|
||||
|
||||
/**
|
||||
* Set the current stream on the device of the passed in stream to be the passed
|
||||
* in stream.
|
||||
*/
|
||||
C10_XPU_API void setCurrentXPUStream(XPUStream stream);
|
||||
|
||||
C10_XPU_API std::ostream& operator<<(std::ostream& stream, const XPUStream& s);
|
||||
|
||||
/**
|
||||
* Block all reserved SYCL queues in the stream pools on the device, and wait
|
||||
* for their synchronizations.
|
||||
*/
|
||||
C10_XPU_API void syncStreamsOnDevice(DeviceIndex device = -1);
|
||||
|
||||
} // namespace c10::xpu
|
||||
|
||||
namespace std {
|
||||
template <>
|
||||
struct hash<c10::xpu::XPUStream> {
|
||||
size_t operator()(c10::xpu::XPUStream s) const noexcept {
|
||||
return std::hash<c10::Stream>{}(s.unwrap());
|
||||
}
|
||||
};
|
||||
} // namespace std
|
@ -2,6 +2,7 @@
|
||||
|
||||
set(C10_XPU_ALL_TEST_FILES
|
||||
impl/XPUDeviceTest.cpp
|
||||
impl/XPUStreamTest.cpp
|
||||
)
|
||||
if(BUILD_TEST)
|
||||
foreach(test_src ${C10_XPU_ALL_TEST_FILES})
|
||||
|
193
c10/xpu/test/impl/XPUStreamTest.cpp
Normal file
193
c10/xpu/test/impl/XPUStreamTest.cpp
Normal file
@ -0,0 +1,193 @@
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <c10/util/Optional.h>
|
||||
#include <c10/util/irange.h>
|
||||
#include <c10/xpu/XPUStream.h>
|
||||
|
||||
#include <thread>
|
||||
#include <unordered_set>
|
||||
|
||||
bool has_xpu() {
|
||||
return c10::xpu::device_count() > 0;
|
||||
}
|
||||
|
||||
TEST(XPUStreamTest, CopyAndMoveTest) {
|
||||
if (!has_xpu()) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t device = -1;
|
||||
sycl::queue queue;
|
||||
c10::xpu::XPUStream copyStream = c10::xpu::getStreamFromPool();
|
||||
{
|
||||
auto s = c10::xpu::getStreamFromPool();
|
||||
device = s.device_index();
|
||||
queue = s.queue();
|
||||
|
||||
copyStream = s;
|
||||
|
||||
EXPECT_EQ(copyStream.device_index(), device);
|
||||
EXPECT_EQ(copyStream.queue(), queue);
|
||||
}
|
||||
|
||||
EXPECT_EQ(copyStream.device_index(), device);
|
||||
EXPECT_EQ(copyStream.queue(), queue);
|
||||
|
||||
// Tests that moving works as expected and preserves the stream
|
||||
c10::xpu::XPUStream moveStream = c10::xpu::getStreamFromPool();
|
||||
{
|
||||
auto s = c10::xpu::getStreamFromPool();
|
||||
device = s.device_index();
|
||||
queue = s.queue();
|
||||
|
||||
moveStream = std::move(s);
|
||||
|
||||
EXPECT_EQ(moveStream.device_index(), device);
|
||||
EXPECT_EQ(moveStream.queue(), queue);
|
||||
}
|
||||
|
||||
EXPECT_EQ(moveStream.device_index(), device);
|
||||
EXPECT_EQ(moveStream.queue(), queue);
|
||||
}
|
||||
|
||||
TEST(XPUStreamTest, StreamBehavior) {
|
||||
if (!has_xpu()) {
|
||||
return;
|
||||
}
|
||||
|
||||
c10::xpu::XPUStream stream = c10::xpu::getStreamFromPool();
|
||||
EXPECT_EQ(stream.device_type(), c10::kXPU);
|
||||
c10::xpu::setCurrentXPUStream(stream);
|
||||
c10::xpu::XPUStream cur_stream = c10::xpu::getCurrentXPUStream();
|
||||
|
||||
EXPECT_EQ(cur_stream, stream);
|
||||
EXPECT_EQ(stream.priority(), 0);
|
||||
|
||||
auto [least_priority, greatest_priority] =
|
||||
c10::xpu::XPUStream::priority_range();
|
||||
EXPECT_EQ(least_priority, 0);
|
||||
EXPECT_TRUE(greatest_priority < 0);
|
||||
|
||||
stream = c10::xpu::getStreamFromPool(/* isHighPriority */ true);
|
||||
EXPECT_TRUE(stream.priority() < 0);
|
||||
|
||||
if (c10::xpu::device_count() <= 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
c10::xpu::set_device(0);
|
||||
stream = c10::xpu::getStreamFromPool(false, 1);
|
||||
EXPECT_EQ(stream.device_index(), 1);
|
||||
EXPECT_NE(stream.device_index(), c10::xpu::current_device());
|
||||
}
|
||||
|
||||
void thread_fun(c10::optional<c10::xpu::XPUStream>& cur_thread_stream) {
|
||||
auto new_stream = c10::xpu::getStreamFromPool();
|
||||
c10::xpu::setCurrentXPUStream(new_stream);
|
||||
cur_thread_stream = {c10::xpu::getCurrentXPUStream()};
|
||||
EXPECT_EQ(*cur_thread_stream, new_stream);
|
||||
}
|
||||
|
||||
// Ensures streams are thread local
|
||||
TEST(XPUStreamTest, MultithreadStreamBehavior) {
|
||||
if (!has_xpu()) {
|
||||
return;
|
||||
}
|
||||
c10::optional<c10::xpu::XPUStream> s0, s1;
|
||||
|
||||
std::thread t0{thread_fun, std::ref(s0)};
|
||||
std::thread t1{thread_fun, std::ref(s1)};
|
||||
t0.join();
|
||||
t1.join();
|
||||
|
||||
c10::xpu::XPUStream cur_stream = c10::xpu::getCurrentXPUStream();
|
||||
|
||||
EXPECT_NE(cur_stream, *s0);
|
||||
EXPECT_NE(cur_stream, *s1);
|
||||
EXPECT_NE(s0, s1);
|
||||
}
|
||||
|
||||
// Ensure queue pool round-robin fashion
|
||||
TEST(XPUStreamTest, StreamPoolRoundRobinTest) {
|
||||
if (!has_xpu()) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::vector<c10::xpu::XPUStream> streams{};
|
||||
for (C10_UNUSED const auto _ : c10::irange(200)) {
|
||||
streams.emplace_back(c10::xpu::getStreamFromPool());
|
||||
}
|
||||
|
||||
std::unordered_set<sycl::queue> queue_set{};
|
||||
bool hasDuplicates = false;
|
||||
for (const auto i : c10::irange(streams.size())) {
|
||||
auto& queue = streams[i].queue();
|
||||
auto result_pair = queue_set.insert(queue);
|
||||
if (!result_pair.second) { // already existed
|
||||
hasDuplicates = true;
|
||||
} else { // newly inserted
|
||||
EXPECT_TRUE(!hasDuplicates);
|
||||
}
|
||||
}
|
||||
EXPECT_TRUE(hasDuplicates);
|
||||
|
||||
auto stream = c10::xpu::getStreamFromPool(/* isHighPriority */ true);
|
||||
auto result_pair = queue_set.insert(stream.queue());
|
||||
EXPECT_TRUE(result_pair.second);
|
||||
}
|
||||
|
||||
void asyncMemCopy(sycl::queue& queue, int* dst, int* src, size_t numBytes) {
|
||||
queue.memcpy(dst, src, numBytes);
|
||||
}
|
||||
|
||||
void clearHostData(int* hostData, int numel) {
|
||||
for (const auto i : c10::irange(numel)) {
|
||||
hostData[i] = 0;
|
||||
}
|
||||
}
|
||||
|
||||
void validateHostData(int* hostData, int numel) {
|
||||
for (const auto i : c10::irange(numel)) {
|
||||
EXPECT_EQ(hostData[i], i);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(XPUStreamTest, StreamFunction) {
|
||||
if (!has_xpu()) {
|
||||
return;
|
||||
}
|
||||
|
||||
constexpr int numel = 1024;
|
||||
int hostData[numel];
|
||||
for (const auto i : c10::irange(numel)) {
|
||||
hostData[i] = i;
|
||||
}
|
||||
|
||||
auto stream = c10::xpu::getStreamFromPool();
|
||||
EXPECT_TRUE(stream.query());
|
||||
int* deviceData = sycl::malloc_device<int>(numel, stream);
|
||||
|
||||
// H2D
|
||||
asyncMemCopy(stream, deviceData, hostData, sizeof(int) * numel);
|
||||
c10::xpu::syncStreamsOnDevice();
|
||||
EXPECT_TRUE(stream.query());
|
||||
|
||||
clearHostData(hostData, numel);
|
||||
|
||||
// D2H
|
||||
asyncMemCopy(stream, hostData, deviceData, sizeof(int) * numel);
|
||||
c10::xpu::syncStreamsOnDevice();
|
||||
|
||||
validateHostData(hostData, numel);
|
||||
|
||||
stream = c10::xpu::getStreamFromPool(-1);
|
||||
|
||||
clearHostData(hostData, numel);
|
||||
|
||||
// D2H
|
||||
asyncMemCopy(stream, hostData, deviceData, sizeof(int) * numel);
|
||||
c10::xpu::syncStreamsOnDevice();
|
||||
|
||||
validateHostData(hostData, numel);
|
||||
sycl::free(deviceData, c10::xpu::get_device_context());
|
||||
}
|
Reference in New Issue
Block a user