[DP] Create placement groups by ray_device_key (#25026)

Signed-off-by: Xinyu Chen <xinyu1.chen@intel.com>
Co-authored-by: Kunshang Ji <kunshang.ji@intel.com>
This commit is contained in:
Xinyu Chen
2025-09-17 16:57:25 +08:00
committed by GitHub
parent 0fb2551c23
commit bb58dc8c20

View File

@ -334,20 +334,22 @@ class CoreEngineActorManager:
"No nodes with resources found in Ray cluster.")
assert dp_master_ip_key in nodes[0], (
"The DP master node (ip: %s) is missing or dead", dp_master_ip)
device_str = current_platform.ray_device_key
for node_resources in nodes:
if "GPU" not in node_resources:
if device_str not in node_resources:
continue
# For now, each DP rank can only be assigned to one node
# TODO(rui): support allocating a single DP rank
# to multiple nodes
available_engine_count = int(node_resources["GPU"]) // world_size
available_engine_count = int(
node_resources[device_str]) // world_size
if dp_master_ip_key in node_resources:
assert available_engine_count >= local_engine_count, (
"Not enough resources to allocate DP ranks "
f"on DP master node {dp_master_ip}")
for i in range(local_engine_count):
bundles = [{
"GPU": 1.0,
device_str: 1.0,
"node:" + dp_master_ip: 0.001
}] * world_size + [{
"CPU": 1.0
@ -363,7 +365,7 @@ class CoreEngineActorManager:
for i in range(available_engine_count):
if len(placement_groups) == num_pg_to_create:
break
bundles = [{"GPU": 1.0}] * world_size + [{"CPU": 1.0}]
bundles = [{device_str: 1.0}] * world_size + [{"CPU": 1.0}]
pg = ray.util.placement_group(
name=f"dp_rank_{len(placement_groups)}",
strategy="STRICT_PACK",
@ -415,17 +417,18 @@ class CoreEngineActorManager:
local_dp_ranks = []
num_pg_created = 0
device_str = current_platform.ray_device_key
for node in nodes:
if num_pg_created >= num_pg_to_create:
break
node_ip = node.node_ip
node_id = node.node_id
available_gpus = int(available_resources[node_id]["GPU"])
available_gpus = int(available_resources[node_id][device_str])
# Get total GPUs on this node from the node's resources
# Ray stores node resources with node ID as key
total_gpus = int(total_resources[node_id]["GPU"])
total_gpus = int(total_resources[node_id][device_str])
# Calculate used GPUs and used engines on this node
used_gpus = max(0, total_gpus - available_gpus)
@ -444,13 +447,13 @@ class CoreEngineActorManager:
# Create bundles with node constraint for master node
if node_ip == dp_master_ip:
bundles = [{
"GPU": 1.0,
device_str: 1.0,
"node:" + dp_master_ip: 0.001
}] * world_size + [{
"CPU": 1.0
}]
else:
bundles = [{"GPU": 1.0}] * world_size + [{"CPU": 1.0}]
bundles = [{device_str: 1.0}] * world_size + [{"CPU": 1.0}]
pg = ray.util.placement_group(
name=f"dp_rank_{rank}",