mirror of
https://github.com/vllm-project/vllm.git
synced 2025-10-20 14:53:52 +08:00
@ -325,6 +325,7 @@ class AsyncLLM(EngineClient):
|
||||
# Note: drain queue without await if possible (avoids
|
||||
# task switching under load which helps performance).
|
||||
out = q.get_nowait() or await q.get()
|
||||
|
||||
|
||||
# Note: both OutputProcessor and EngineCore handle their
|
||||
# own request cleanup based on finished.
|
||||
|
@ -709,8 +709,12 @@ class AsyncMPClient(MPClient):
|
||||
assert output_socket is not None
|
||||
|
||||
async def process_outputs_socket():
|
||||
i = 0
|
||||
try:
|
||||
while True:
|
||||
if i % 100 == 0:
|
||||
logger.info(f"{i=}")
|
||||
i += 1
|
||||
frames = await output_socket.recv_multipart(copy=False)
|
||||
resources.validate_alive(frames)
|
||||
outputs: EngineCoreOutputs = decoder.decode(frames)
|
||||
@ -895,6 +899,8 @@ class DPAsyncMPClient(AsyncMPClient):
|
||||
return
|
||||
|
||||
assert self.stats_update_address is not None
|
||||
dp_start_rank = self.vllm_config.parallel_config.data_parallel_rank
|
||||
dp_end_rank = dp_start_rank + self.vllm_config.parallel_config.data_parallel_size_local
|
||||
|
||||
async def run_engine_stats_update_task():
|
||||
with make_zmq_socket(self.ctx, self.stats_update_address,
|
||||
@ -959,7 +965,7 @@ class DPAsyncMPClient(AsyncMPClient):
|
||||
counts, wave, running = msgspec.msgpack.decode(buf)
|
||||
self.current_wave = wave
|
||||
self.engines_running = running
|
||||
self.lb_engines = counts
|
||||
self.lb_engines = counts[dp_start_rank:dp_end_rank]
|
||||
|
||||
resources.stats_update_task = asyncio.create_task(
|
||||
run_engine_stats_update_task())
|
||||
@ -973,6 +979,7 @@ class DPAsyncMPClient(AsyncMPClient):
|
||||
chosen_engine = self.get_core_engine_for_request(request)
|
||||
to_await = self._send_input(EngineCoreRequestType.ADD, request,
|
||||
chosen_engine)
|
||||
|
||||
if not self.engines_running:
|
||||
# Notify coordinator that we're sending a request
|
||||
req_msg = msgspec.msgpack.encode(("FIRST_REQ", chosen_engine))
|
||||
@ -1007,6 +1014,9 @@ class DPLBAsyncMPClient(DPAsyncMPClient):
|
||||
|
||||
def get_core_engine_for_request(
|
||||
self, request: EngineCoreRequest) -> EngineIdentity:
|
||||
logger.info(f"{self.lb_engines=} | {request.data_parallel_rank=}")
|
||||
logger.info(f"{self.core_engine=}")
|
||||
logger.info(f"{self.client_index=}")
|
||||
# Engines are in rank order.
|
||||
if (eng_index := request.data_parallel_rank) is None:
|
||||
if not self.lb_engines:
|
||||
|
Reference in New Issue
Block a user