mirror of
https://github.com/vllm-project/vllm.git
synced 2025-10-20 14:53:52 +08:00
[BugFix] Fix stuck stats/metrics after requests are aborted (#22995)
Signed-off-by: Nick Hill <nhill@redhat.com>
This commit is contained in:
@ -1,6 +1,6 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
||||
|
||||
import asyncio
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
@ -294,6 +294,99 @@ async def test_metrics_exist(server: RemoteOpenAIServer,
|
||||
assert metric in response.text
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_abort_metrics_reset(server: RemoteOpenAIServer,
|
||||
client: openai.AsyncClient, use_v1: bool):
|
||||
|
||||
running_requests, waiting_requests, kv_cache_usage = (
|
||||
_get_running_metrics_from_api(server))
|
||||
|
||||
# Expect no running requests or kvcache usage
|
||||
assert running_requests == 0
|
||||
assert waiting_requests == 0
|
||||
assert kv_cache_usage == 0.0
|
||||
|
||||
# Start some long-running requests that we can abort
|
||||
tasks = []
|
||||
for _ in range(3):
|
||||
task = asyncio.create_task(
|
||||
client.completions.create(
|
||||
model=MODEL_NAME,
|
||||
prompt=_TOKENIZED_PROMPT,
|
||||
max_tokens=100, # Long generation to give time to abort
|
||||
temperature=0.0))
|
||||
tasks.append(task)
|
||||
|
||||
# Wait a bit for requests to start processing
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
# Check that we have running requests
|
||||
running_requests, waiting_requests, kv_cache_usage = (
|
||||
_get_running_metrics_from_api(server))
|
||||
|
||||
# Expect running requests and kvcache usage
|
||||
assert running_requests > 0
|
||||
assert kv_cache_usage > 0
|
||||
|
||||
# Cancel all tasks to abort the requests
|
||||
for task in tasks:
|
||||
task.cancel()
|
||||
|
||||
# Wait for cancellations to be processed
|
||||
await asyncio.sleep(1.0)
|
||||
|
||||
# Check that metrics have reset to zero
|
||||
response = requests.get(server.url_for("metrics"))
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
# Verify running and waiting requests counts and KV cache usage are zero
|
||||
running_requests_after, waiting_requests_after, kv_cache_usage_after = (
|
||||
_get_running_metrics_from_api(server))
|
||||
|
||||
assert running_requests_after == 0,\
|
||||
(f"Expected 0 running requests after abort, got "
|
||||
f"{running_requests_after}")
|
||||
assert waiting_requests_after == 0,\
|
||||
(f"Expected 0 waiting requests after abort, got "
|
||||
f"{waiting_requests_after}")
|
||||
assert kv_cache_usage_after == 0,\
|
||||
(f"Expected 0% KV cache usage after abort, got "
|
||||
f"{kv_cache_usage_after}")
|
||||
|
||||
|
||||
def _get_running_metrics_from_api(server: RemoteOpenAIServer):
|
||||
"""Return (running_count, waiting_count, kv_cache_usage)"""
|
||||
|
||||
response = requests.get(server.url_for("metrics"))
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
# Verify running and waiting requests counts and KV cache usage are zero
|
||||
running_requests, waiting_requests, kv_cache_usage = None, None, None
|
||||
|
||||
for family in text_string_to_metric_families(response.text):
|
||||
if family.name == "vllm:num_requests_running":
|
||||
for sample in family.samples:
|
||||
if sample.name == "vllm:num_requests_running":
|
||||
running_requests = sample.value
|
||||
break
|
||||
elif family.name == "vllm:num_requests_waiting":
|
||||
for sample in family.samples:
|
||||
if sample.name == "vllm:num_requests_waiting":
|
||||
waiting_requests = sample.value
|
||||
break
|
||||
elif family.name == "vllm:gpu_cache_usage_perc":
|
||||
for sample in family.samples:
|
||||
if sample.name == "vllm:gpu_cache_usage_perc":
|
||||
kv_cache_usage = sample.value
|
||||
break
|
||||
|
||||
assert running_requests is not None
|
||||
assert waiting_requests is not None
|
||||
assert kv_cache_usage is not None
|
||||
|
||||
return running_requests, waiting_requests, kv_cache_usage
|
||||
|
||||
|
||||
def test_metrics_exist_run_batch(use_v1: bool):
|
||||
input_batch = """{"custom_id": "request-0", "method": "POST", "url": "/v1/embeddings", "body": {"model": "intfloat/multilingual-e5-small", "input": "You are a helpful assistant."}}""" # noqa: E501
|
||||
|
||||
|
@ -298,7 +298,12 @@ class BlockPool:
|
||||
Returns:
|
||||
The KV cache usage (between 0.0 and 1.0).
|
||||
"""
|
||||
return 1.0 - (self.get_num_free_blocks() / self.num_gpu_blocks)
|
||||
|
||||
# Subtract 1 to account for null block.
|
||||
total_gpu_blocks = self.num_gpu_blocks - 1
|
||||
if not total_gpu_blocks:
|
||||
return 0
|
||||
return 1.0 - (self.get_num_free_blocks() / total_gpu_blocks)
|
||||
|
||||
def take_events(self) -> list[KVCacheEvent]:
|
||||
"""Atomically takes all events and clears the queue.
|
||||
|
@ -902,10 +902,13 @@ class Scheduler(SchedulerInterface):
|
||||
finished_requests=finished_set)
|
||||
finished_req_ids.clear()
|
||||
|
||||
if engine_core_outputs:
|
||||
if (stats := self.make_stats(spec_decoding_stats)) is not None:
|
||||
# Return stats to only one of the front-ends.
|
||||
next(iter(engine_core_outputs.values())).scheduler_stats = (
|
||||
self.make_stats(spec_decoding_stats))
|
||||
if (eco := next(iter(engine_core_outputs.values()), None)) is None:
|
||||
# We must return the stats even if there are no request
|
||||
# outputs this step.
|
||||
engine_core_outputs[0] = eco = EngineCoreOutputs()
|
||||
eco.scheduler_stats = stats
|
||||
|
||||
return engine_core_outputs
|
||||
|
||||
|
Reference in New Issue
Block a user