Refine XPU external Stream (#142347)

Pull Request resolved: https://github.com/pytorch/pytorch/pull/142347
Approved by: https://github.com/gujinghui, https://github.com/albanD
This commit is contained in:
Yu, Guangye
2024-12-31 09:55:37 +00:00
committed by PyTorch MergeBot
parent 16a57e232c
commit 39450ae655
4 changed files with 151 additions and 30 deletions

View File

@ -37,27 +37,30 @@ thread_local std::unique_ptr<StreamId[]> current_streams = nullptr;
// ~~~~~~~~~~~~~~~~~~~~~~~~~~
// How do we assign stream IDs?
//
// -- 56 bits -- -- 5 bits ----- -- 3 bits -- -- 1 bits --
// zeros StreamIdIndex StreamIdType Ext/native stream
// -- 55 bits -- -- 5 bits -- -- 3 bits -- -- 1 bit --
// zeros StreamIdIndex StreamIdType Ext/native stream
// ignored for ext ignored for ext
//
// Where StreamIdType:
// 000 = normal priority queue
// 001 = high priority queue
// 111 = external queue
//
// for external stream, StreamID is a sycl::queue* pointer
// this means that last bit will always be 0
// so when constructing StreamId for a native stream we set last bit to 1
// to distinguish between native and external streams
// For external stream, StreamID is a sycl::queue* pointer. This means that last
// bit will always be 0. So when constructing StreamId for a native stream we
// set last bit to 1 to distinguish between native and external streams. For
// more details, see Note [External XPU Stream].
//
// StreamId is 64-bit, so we can just rely on regular promotion rules.
// We rely on StreamIdIndex and StreamIdType being non-negative;
using StreamIdIndex = uint8_t;
enum class StreamIdType : uint8_t {
// The higher the type number, the higher the priority.
// EXT is used for external streams, which we don't know the priority of.
// The higher the type number, the higher the priority for the native stream.
NORMAL = 0x0,
HIGH = 0X1,
// For an external stream, the last bit of StreamId is 0, whose priority is
// queried at runtime.
EXT = 0x7,
};
@ -76,9 +79,9 @@ inline std::ostream& operator<<(std::ostream& stream, StreamIdType q) {
}
inline StreamIdType streamIdType(StreamId s) {
// Externally allocated streams have their id being the sycl:queue* pointer
// so the last bit will be 0
if ((!(s & 1) && s)) {
// Externally allocated streams have their id being the sycl:queue* pointer.
// So the last bit will be 0.
if ((!(s & 1))) {
return StreamIdType(StreamIdType::EXT);
}
int mask_for_type = (1 << kStreamTypeBits) - 1;
@ -180,13 +183,16 @@ XPUStream XPUStreamForId(DeviceIndex device_index, StreamId stream_id) {
int XPUStream::priority() const {
StreamId stream_id = stream_.id();
StreamIdType st = streamIdType(stream_id);
// For an external queue which is not created in XPUStream, we can not trace
// the priority. Workaround here since sycl doesn't support get priority from
// a sycl::queue, like cudaStreamGetPriority .
// TODO: remove this workaround when sycl supports get priority from a
// sycl::queue.
if (st == StreamIdType::EXT) {
if (C10_UNLIKELY(st == StreamIdType::EXT)) {
// Query external stream priority
using namespace sycl::ext::oneapi::property;
// Default priority for SYCL queue is normal.
st = StreamIdType::NORMAL;
if (queue().has_property<queue::priority_normal>()) {
st = StreamIdType::NORMAL;
} else if (queue().has_property<queue::priority_high>()) {
st = StreamIdType::HIGH;
}
}
// StreamIdType and priority number are inversely related.
return -static_cast<int>(st);
@ -199,11 +205,12 @@ sycl::queue& XPUStream::queue() const {
StreamIdType st = streamIdType(stream_id);
StreamIdIndex si = streamIdIndex(stream_id);
switch (st) {
case StreamIdType::EXT:
return *(reinterpret_cast<sycl::queue*>(stream_id));
case StreamIdType::NORMAL:
case StreamIdType::HIGH:
return *streams[device_index][static_cast<uint8_t>(st)][si];
// See Note [External XPU Stream]
case StreamIdType::EXT:
return *(reinterpret_cast<sycl::queue*>(stream_id));
default:
TORCH_CHECK(
false,
@ -245,13 +252,54 @@ XPUStream getStreamFromPool(const bool isHighPriority, DeviceIndex device) {
return getStreamFromPool(priority, device);
}
/*
* Note [External XPU Stream]
*
* An external XPUStream is a wrapper around an external SYCL queue that was not
* created by PyTorch. This design enables interoperability with other libraries
* by allowing PyTorch to work seamlessly with SYCL queues created outside of
* its control.
*
* Key design requirements include:
* 1. Allowing retrieval of the its SYCL queue from the external XPUStream.
* 2. Supporting conversion between an external XPUStream and a `c10::Stream`.
* 3. Ensuring compatibility with the `get/setCurrentXPUStream` methods.
* 4. Enabling memory caching allocation through the external XPUStream.
*
* To address requirements (1) and (2), we associate the external SYCL queue
* pointer with the `stream_id`. It is the user's responsibility to ensure that
* the referenced SYCL queue remains alive while the corresponding XPUStream, or
* any c10::Stream derived from it, is in use.
*
* However, this approach introduces the following limitations:
*
* 1. Different SYCL queue pointers will result in distinct XPUStream
* instances, even if the SYCL queues they dereference are equivalent.
* 2. Memory blocks allocated by one external XPUStream CANNOT be reused by
* other non-equivalent XPUStreams, even if they originate from the same SYCL
* queue object.
*/
XPUStream getStreamFromExternal(
sycl::queue* ext_stream,
sycl::queue* ext_queue,
DeviceIndex device_index) {
// The sycl::queue* will be the actual id
TORCH_CHECK(ext_stream, "External stream must not be a nullptr.");
return XPUStreamForId(device_index, reinterpret_cast<int64_t>(ext_stream));
TORCH_CHECK(ext_queue, "External sycl::queue* must not be a nullptr.");
TORCH_CHECK(
ext_queue->is_in_order(), "External SYCL queue must be in-order.");
TORCH_CHECK(
ext_queue->get_context() == c10::xpu::get_device_context(),
"External SYCL queue must be created with the same context as the PyTorch XPU used.");
TORCH_CHECK(
ext_queue->get_device() == c10::xpu::get_raw_device(device_index),
"External SYCL queue doesn't match the given device index.");
StreamId stream_id = reinterpret_cast<StreamId>(ext_queue);
TORCH_CHECK(
!(stream_id & 1),
"External sycl::queue* must have the last bit set to 0. ",
"You can file an issue at https://github.com/pytorch/pytorch/issues to describe your use case.");
return XPUStreamForId(device_index, stream_id);
}
// Note: The stream pools will be initialized if needed, at the first invocation

View File

@ -158,14 +158,21 @@ C10_XPU_API XPUStream
getStreamFromPool(const int priority, DeviceIndex device = -1);
/**
* Get a XPUStream from a externally allocated one.
* Get an XPUStream from an external SYCL queue.
*
* This is mainly for interoperability with different libraries where we
* want to operate on a non-torch allocated stream for data exchange or similar
* purposes
* This function allows interoperability with other libraries by enabling
* the use of an external SYCL queue that was not created by PyTorch. This
* can be useful for data exchange or other operations where integration
* with non-PyTorch queues is required.
*
* NOTE: It is the user's responsibility to ensure that the referenced SYCL
* queue remains alive while the corresponding XPUStream, or any c10::Stream
* derived from it, is in use. The different SYCL queue pointers will result in
* distinct XPUStream instances, even if the SYCL queues they dereference are
* equivalent.
*/
C10_API XPUStream
getStreamFromExternal(sycl::queue* ext_stream, DeviceIndex device_index);
C10_XPU_API XPUStream
getStreamFromExternal(sycl::queue* ext_queue, DeviceIndex device_index);
/**
* Get the current XPU stream, for the passed XPU device, or for the current

View File

@ -2,6 +2,7 @@
#include <c10/util/irange.h>
#include <c10/xpu/XPUCachingAllocator.h>
#include <c10/xpu/XPUException.h>
bool has_xpu() {
return c10::xpu::device_count() > 0;
@ -75,6 +76,46 @@ TEST(XPUCachingAllocatorTest, AllocateMemory) {
for (const auto i : c10::irange(numel)) {
EXPECT_EQ(hostData[i], i);
}
c10::xpu::XPUCachingAllocator::emptyCache();
}
TEST(XPUCachingAllocatorTest, DeviceCachingAllocateByExternalStream) {
c10::xpu::XPUCachingAllocator::emptyCache();
auto* allocator = c10::xpu::XPUCachingAllocator::get();
sycl::queue* ext_queue = new sycl::queue(
c10::xpu::get_device_context(),
c10::xpu::get_raw_device(0),
c10::xpu::asyncHandler,
{sycl::property::queue::in_order()});
// 500M memory is reserved, can be reused later.
{
c10::xpu::XPUStream ext_stream =
c10::xpu::getStreamFromExternal(ext_queue, 0);
c10::xpu::setCurrentXPUStream(ext_stream);
auto _500mb = 500 * 1024 * 1024;
auto cache = allocator->allocate(_500mb);
}
auto _10mb = 10 * 1024 * 1024;
auto buffer = allocator->allocate(_10mb);
void* ptr0 = buffer.get();
// tmp is not allocated via device caching allocator.
void* tmp = sycl::aligned_alloc_device(
512, _10mb, c10::xpu::get_raw_device(0), c10::xpu::get_device_context());
void* ptr1 = c10::xpu::XPUCachingAllocator::raw_alloc(_10mb);
// We have reserved 500M of memory for resue. When allocating `ptr0` and
// `ptr1` through the device caching allocator, they should be allocated from
// the same block. Specifically, `ptr1` should follow immediately after `ptr0`
// in the block, forming a sequence like [ptr0, ptr1]. This behavior occurs
// because the `tmp` pointer is not allocated through the device caching
// allocator, meaning it cannot reuse the reserved memory. As a result, the
// offset between `ptr0` and `ptr1` should match the size of `ptr0` (10M in
// this case).
auto diff = static_cast<char*>(ptr1) - static_cast<char*>(ptr0);
EXPECT_EQ(diff, _10mb);
c10::xpu::XPUCachingAllocator::raw_delete(ptr1);
sycl::free(tmp, c10::xpu::get_device_context());
delete ext_queue;
c10::xpu::XPUCachingAllocator::emptyCache();
}
int main(int argc, char* argv[]) {

View File

@ -202,6 +202,7 @@ TEST(XPUStreamTest, ExternalTest) {
at::xpu::setCurrentXPUStream(myStream);
at::xpu::XPUStream curStream = at::xpu::getCurrentXPUStream();
EXPECT_EQ(myStream.priority(), 0);
ASSERT_TRUE(curStream == myStream);
ASSERT_TRUE(&(curStream.queue()) == stream);
@ -230,7 +231,7 @@ TEST(XPUStreamTest, ExternalMultiDeviceTest) {
}
{
c10::DeviceGuard device_guard(c10::Device(c10::DeviceType::XPU, 1));
stream_0 = new sycl::queue(
stream_1 = new sycl::queue(
c10::xpu::get_device_context(),
c10::xpu::get_raw_device(1),
c10::xpu::asyncHandler,
@ -247,4 +248,28 @@ TEST(XPUStreamTest, ExternalMultiDeviceTest) {
delete stream_0;
delete stream_1;
}
}
TEST(XPUStreamTest, ExternalStreamDifferentPointersTest) {
if (!has_xpu()) {
return;
}
using namespace sycl::ext::oneapi::property;
sycl::queue ext_queue = sycl::queue(
c10::xpu::get_device_context(),
c10::xpu::get_raw_device(0),
c10::xpu::asyncHandler,
{sycl::property::queue::in_order(), queue::priority_normal()});
// Ponters to queue and its copies will lead to distinct external XPUStreams.
auto queue_ptr1 = std::make_unique<sycl::queue>(ext_queue);
auto queue_ptr2 = std::make_unique<sycl::queue>(ext_queue);
at::xpu::XPUStream myStream1 =
at::xpu::getStreamFromExternal(queue_ptr1.get(), 0);
at::xpu::XPUStream myStream2 =
at::xpu::getStreamFromExternal(queue_ptr2.get(), 0);
EXPECT_NE(myStream1, myStream2);
}