From 4438796b4831d9bfea6fa843a85373f9fc33da39 Mon Sep 17 00:00:00 2001 From: Robert Shaw Date: Sun, 20 Jul 2025 03:44:38 +0000 Subject: [PATCH] fix lb issues Signed-off-by: Robert Shaw --- vllm/v1/engine/async_llm.py | 1 + vllm/v1/engine/core_client.py | 12 +++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 6395d2c1875..6736ed72734 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -325,6 +325,7 @@ class AsyncLLM(EngineClient): # Note: drain queue without await if possible (avoids # task switching under load which helps performance). out = q.get_nowait() or await q.get() + # Note: both OutputProcessor and EngineCore handle their # own request cleanup based on finished. diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 2d574efd03c..127c4fb8473 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -709,8 +709,12 @@ class AsyncMPClient(MPClient): assert output_socket is not None async def process_outputs_socket(): + i = 0 try: while True: + if i % 100 == 0: + logger.info(f"{i=}") + i += 1 frames = await output_socket.recv_multipart(copy=False) resources.validate_alive(frames) outputs: EngineCoreOutputs = decoder.decode(frames) @@ -895,6 +899,8 @@ class DPAsyncMPClient(AsyncMPClient): return assert self.stats_update_address is not None + dp_start_rank = self.vllm_config.parallel_config.data_parallel_rank + dp_end_rank = dp_start_rank + self.vllm_config.parallel_config.data_parallel_size_local async def run_engine_stats_update_task(): with make_zmq_socket(self.ctx, self.stats_update_address, @@ -959,7 +965,7 @@ class DPAsyncMPClient(AsyncMPClient): counts, wave, running = msgspec.msgpack.decode(buf) self.current_wave = wave self.engines_running = running - self.lb_engines = counts + self.lb_engines = counts[dp_start_rank:dp_end_rank] resources.stats_update_task = asyncio.create_task( run_engine_stats_update_task()) @@ -973,6 +979,7 @@ class DPAsyncMPClient(AsyncMPClient): chosen_engine = self.get_core_engine_for_request(request) to_await = self._send_input(EngineCoreRequestType.ADD, request, chosen_engine) + if not self.engines_running: # Notify coordinator that we're sending a request req_msg = msgspec.msgpack.encode(("FIRST_REQ", chosen_engine)) @@ -1007,6 +1014,9 @@ class DPLBAsyncMPClient(DPAsyncMPClient): def get_core_engine_for_request( self, request: EngineCoreRequest) -> EngineIdentity: + logger.info(f"{self.lb_engines=} | {request.data_parallel_rank=}") + logger.info(f"{self.core_engine=}") + logger.info(f"{self.client_index=}") # Engines are in rank order. if (eng_index := request.data_parallel_rank) is None: if not self.lb_engines: