mirror of
https://github.com/vllm-project/vllm.git
synced 2025-10-20 14:53:52 +08:00
Compare commits
1 Commits
6fad29b11b
...
remove-asy
Author | SHA1 | Date | |
---|---|---|---|
0470cac520 |
@ -56,7 +56,6 @@ steps:
|
||||
source_file_dependencies:
|
||||
- vllm/
|
||||
- tests/mq_llm_engine
|
||||
- tests/async_engine
|
||||
- tests/test_inputs.py
|
||||
- tests/test_outputs.py
|
||||
- tests/multimodal
|
||||
@ -66,7 +65,6 @@ steps:
|
||||
commands:
|
||||
- python3 standalone_tests/lazy_imports.py
|
||||
- pytest -v -s mq_llm_engine # MQLLMEngine
|
||||
- pytest -v -s async_engine # AsyncLLMEngine
|
||||
- pytest -v -s test_inputs.py
|
||||
- pytest -v -s test_outputs.py
|
||||
- pytest -v -s multimodal
|
||||
|
@ -1,54 +0,0 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
||||
"""vllm.entrypoints.api_server with some extra logging for testing."""
|
||||
from collections.abc import Iterable
|
||||
from typing import Any
|
||||
|
||||
import uvicorn
|
||||
from fastapi.responses import JSONResponse, Response
|
||||
|
||||
import vllm.entrypoints.api_server
|
||||
import vllm.envs as envs
|
||||
from vllm.engine.arg_utils import AsyncEngineArgs
|
||||
from vllm.engine.async_llm_engine import AsyncLLMEngine
|
||||
from vllm.utils import FlexibleArgumentParser
|
||||
|
||||
app = vllm.entrypoints.api_server.app
|
||||
|
||||
|
||||
class AsyncLLMEngineWithStats(AsyncLLMEngine):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self._num_aborts = 0
|
||||
|
||||
async def _engine_abort(self, request_ids: Iterable[str]):
|
||||
ids = list(request_ids)
|
||||
self._num_aborts += len(ids)
|
||||
await super()._engine_abort(ids)
|
||||
|
||||
def testing_stats(self) -> dict[str, Any]:
|
||||
return {"num_aborted_requests": self._num_aborts}
|
||||
|
||||
|
||||
@app.get("/stats")
|
||||
def stats() -> Response:
|
||||
"""Get the statistics of the engine."""
|
||||
return JSONResponse(engine.testing_stats())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = FlexibleArgumentParser()
|
||||
parser.add_argument("--host", type=str, default="localhost")
|
||||
parser.add_argument("--port", type=int, default=8000)
|
||||
parser = AsyncEngineArgs.add_cli_args(parser)
|
||||
args = parser.parse_args()
|
||||
|
||||
engine_args = AsyncEngineArgs.from_cli_args(args)
|
||||
engine = AsyncLLMEngineWithStats.from_engine_args(engine_args)
|
||||
vllm.entrypoints.api_server.engine = engine
|
||||
uvicorn.run(app,
|
||||
host=args.host,
|
||||
port=args.port,
|
||||
log_level="debug",
|
||||
timeout_keep_alive=envs.VLLM_HTTP_TIMEOUT_KEEP_ALIVE)
|
@ -1,12 +0,0 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def use_v0_only(monkeypatch):
|
||||
"""
|
||||
Since this module is V0 only, set VLLM_USE_V1=0 for
|
||||
all tests in the module.
|
||||
"""
|
||||
monkeypatch.setenv('VLLM_USE_V1', '0')
|
@ -1,113 +0,0 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from multiprocessing import Pool
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
|
||||
def _query_server(prompt: str, max_tokens: int = 5) -> dict:
|
||||
response = requests.post("http://localhost:8000/generate",
|
||||
json={
|
||||
"prompt": prompt,
|
||||
"max_tokens": max_tokens,
|
||||
"temperature": 0,
|
||||
"ignore_eos": True
|
||||
})
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
|
||||
def _query_server_long(prompt: str) -> dict:
|
||||
return _query_server(prompt, max_tokens=500)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def api_server(distributed_executor_backend: str):
|
||||
script_path = Path(__file__).parent.joinpath(
|
||||
"api_server_async_engine.py").absolute()
|
||||
commands = [
|
||||
sys.executable,
|
||||
"-u",
|
||||
str(script_path),
|
||||
"--model",
|
||||
"facebook/opt-125m",
|
||||
"--host",
|
||||
"127.0.0.1",
|
||||
"--distributed-executor-backend",
|
||||
distributed_executor_backend,
|
||||
]
|
||||
|
||||
# API Server Test Requires V0.
|
||||
my_env = os.environ.copy()
|
||||
my_env["VLLM_USE_V1"] = "0"
|
||||
uvicorn_process = subprocess.Popen(commands, env=my_env)
|
||||
yield
|
||||
uvicorn_process.terminate()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("distributed_executor_backend", ["mp", "ray"])
|
||||
def test_api_server(api_server, distributed_executor_backend: str):
|
||||
"""
|
||||
Run the API server and test it.
|
||||
|
||||
We run both the server and requests in separate processes.
|
||||
|
||||
We test that the server can handle incoming requests, including
|
||||
multiple requests at the same time, and that it can handle requests
|
||||
being cancelled without crashing.
|
||||
"""
|
||||
with Pool(32) as pool:
|
||||
# Wait until the server is ready
|
||||
prompts = ["warm up"] * 1
|
||||
result = None
|
||||
while not result:
|
||||
try:
|
||||
for r in pool.map(_query_server, prompts):
|
||||
result = r
|
||||
break
|
||||
except requests.exceptions.ConnectionError:
|
||||
time.sleep(1)
|
||||
|
||||
# Actual tests start here
|
||||
# Try with 1 prompt
|
||||
for result in pool.map(_query_server, prompts):
|
||||
assert result
|
||||
|
||||
num_aborted_requests = requests.get(
|
||||
"http://localhost:8000/stats").json()["num_aborted_requests"]
|
||||
assert num_aborted_requests == 0
|
||||
|
||||
# Try with 100 prompts
|
||||
prompts = ["test prompt"] * 100
|
||||
for result in pool.map(_query_server, prompts):
|
||||
assert result
|
||||
|
||||
with Pool(32) as pool:
|
||||
# Cancel requests
|
||||
prompts = ["canceled requests"] * 100
|
||||
pool.map_async(_query_server_long, prompts)
|
||||
time.sleep(0.01)
|
||||
pool.terminate()
|
||||
pool.join()
|
||||
|
||||
# check cancellation stats
|
||||
# give it some times to update the stats
|
||||
time.sleep(1)
|
||||
|
||||
num_aborted_requests = requests.get(
|
||||
"http://localhost:8000/stats").json()["num_aborted_requests"]
|
||||
assert num_aborted_requests > 0
|
||||
|
||||
# check that server still runs after cancellations
|
||||
with Pool(32) as pool:
|
||||
# Try with 100 prompts
|
||||
prompts = ["test prompt after canceled"] * 100
|
||||
for result in pool.map(_query_server, prompts):
|
||||
assert result
|
@ -1,71 +0,0 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
||||
|
||||
import pytest
|
||||
|
||||
from vllm.engine.async_llm_engine import RequestTracker
|
||||
from vllm.outputs import RequestOutput
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_request_tracker():
|
||||
tracker = RequestTracker()
|
||||
stream_1 = tracker.add_request("1")
|
||||
assert tracker.new_requests_event.is_set()
|
||||
await tracker.wait_for_new_requests()
|
||||
new, aborted = tracker.get_new_and_aborted_requests()
|
||||
assert not tracker.new_requests_event.is_set()
|
||||
assert len(new) == 1
|
||||
assert new[0]["request_id"] == "1"
|
||||
assert not aborted
|
||||
assert not stream_1.finished
|
||||
|
||||
stream_2 = tracker.add_request("2")
|
||||
stream_3 = tracker.add_request("3")
|
||||
assert tracker.new_requests_event.is_set()
|
||||
await tracker.wait_for_new_requests()
|
||||
new, aborted = tracker.get_new_and_aborted_requests()
|
||||
assert not tracker.new_requests_event.is_set()
|
||||
assert len(new) == 2
|
||||
assert new[0]["request_id"] == "2"
|
||||
assert new[1]["request_id"] == "3"
|
||||
assert not aborted
|
||||
assert not stream_2.finished
|
||||
assert not stream_3.finished
|
||||
|
||||
# request_ids must be unique
|
||||
with pytest.raises(KeyError):
|
||||
tracker.add_request("1")
|
||||
assert not tracker.new_requests_event.is_set()
|
||||
|
||||
tracker.abort_request("1")
|
||||
new, aborted = tracker.get_new_and_aborted_requests()
|
||||
assert len(aborted) == 1
|
||||
assert "1" in aborted
|
||||
assert not new
|
||||
assert stream_1.finished
|
||||
|
||||
stream_4 = tracker.add_request("4")
|
||||
tracker.abort_request("4")
|
||||
assert tracker.new_requests_event.is_set()
|
||||
await tracker.wait_for_new_requests()
|
||||
new, aborted = tracker.get_new_and_aborted_requests()
|
||||
# aborted new requests will cancel each other out -
|
||||
# there's no need for them to propagate into the
|
||||
# engine
|
||||
assert not aborted
|
||||
assert not new
|
||||
assert stream_4.finished
|
||||
|
||||
stream_5 = tracker.add_request("5")
|
||||
assert tracker.new_requests_event.is_set()
|
||||
tracker.process_request_output(
|
||||
RequestOutput("2", "output", [], [], [], finished=True))
|
||||
await tracker.wait_for_new_requests()
|
||||
new, aborted = tracker.get_new_and_aborted_requests()
|
||||
assert not tracker.new_requests_event.is_set()
|
||||
assert not aborted
|
||||
assert len(new) == 1
|
||||
assert new[0]["request_id"] == "5"
|
||||
assert stream_2.finished
|
||||
assert not stream_5.finished
|
Reference in New Issue
Block a user