mirror of
				https://github.com/vllm-project/vllm.git
				synced 2025-10-27 03:24:34 +08:00 
			
		
		
		
	Compare commits
	
		
			1 Commits
		
	
	
		
			v0.10.2
			...
			remove-asy
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 0470cac520 | 
| @ -56,7 +56,6 @@ steps: | ||||
|   source_file_dependencies: | ||||
|   - vllm/ | ||||
|   - tests/mq_llm_engine | ||||
|   - tests/async_engine | ||||
|   - tests/test_inputs.py | ||||
|   - tests/test_outputs.py | ||||
|   - tests/multimodal | ||||
| @ -66,7 +65,6 @@ steps: | ||||
|   commands: | ||||
|   - python3 standalone_tests/lazy_imports.py | ||||
|   - pytest -v -s mq_llm_engine # MQLLMEngine | ||||
|   - pytest -v -s async_engine # AsyncLLMEngine | ||||
|   - pytest -v -s test_inputs.py | ||||
|   - pytest -v -s test_outputs.py | ||||
|   - pytest -v -s multimodal | ||||
|  | ||||
| @ -1,54 +0,0 @@ | ||||
| # SPDX-License-Identifier: Apache-2.0 | ||||
| # SPDX-FileCopyrightText: Copyright contributors to the vLLM project | ||||
| """vllm.entrypoints.api_server with some extra logging for testing.""" | ||||
| from collections.abc import Iterable | ||||
| from typing import Any | ||||
|  | ||||
| import uvicorn | ||||
| from fastapi.responses import JSONResponse, Response | ||||
|  | ||||
| import vllm.entrypoints.api_server | ||||
| import vllm.envs as envs | ||||
| from vllm.engine.arg_utils import AsyncEngineArgs | ||||
| from vllm.engine.async_llm_engine import AsyncLLMEngine | ||||
| from vllm.utils import FlexibleArgumentParser | ||||
|  | ||||
| app = vllm.entrypoints.api_server.app | ||||
|  | ||||
|  | ||||
| class AsyncLLMEngineWithStats(AsyncLLMEngine): | ||||
|  | ||||
|     def __init__(self, *args, **kwargs): | ||||
|         super().__init__(*args, **kwargs) | ||||
|         self._num_aborts = 0 | ||||
|  | ||||
|     async def _engine_abort(self, request_ids: Iterable[str]): | ||||
|         ids = list(request_ids) | ||||
|         self._num_aborts += len(ids) | ||||
|         await super()._engine_abort(ids) | ||||
|  | ||||
|     def testing_stats(self) -> dict[str, Any]: | ||||
|         return {"num_aborted_requests": self._num_aborts} | ||||
|  | ||||
|  | ||||
| @app.get("/stats") | ||||
| def stats() -> Response: | ||||
|     """Get the statistics of the engine.""" | ||||
|     return JSONResponse(engine.testing_stats()) | ||||
|  | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|     parser = FlexibleArgumentParser() | ||||
|     parser.add_argument("--host", type=str, default="localhost") | ||||
|     parser.add_argument("--port", type=int, default=8000) | ||||
|     parser = AsyncEngineArgs.add_cli_args(parser) | ||||
|     args = parser.parse_args() | ||||
|  | ||||
|     engine_args = AsyncEngineArgs.from_cli_args(args) | ||||
|     engine = AsyncLLMEngineWithStats.from_engine_args(engine_args) | ||||
|     vllm.entrypoints.api_server.engine = engine | ||||
|     uvicorn.run(app, | ||||
|                 host=args.host, | ||||
|                 port=args.port, | ||||
|                 log_level="debug", | ||||
|                 timeout_keep_alive=envs.VLLM_HTTP_TIMEOUT_KEEP_ALIVE) | ||||
| @ -1,12 +0,0 @@ | ||||
| # SPDX-License-Identifier: Apache-2.0 | ||||
| # SPDX-FileCopyrightText: Copyright contributors to the vLLM project | ||||
| import pytest | ||||
|  | ||||
|  | ||||
| @pytest.fixture(scope="function", autouse=True) | ||||
| def use_v0_only(monkeypatch): | ||||
|     """ | ||||
|     Since this module is V0 only, set VLLM_USE_V1=0 for | ||||
|     all tests in the module. | ||||
|     """ | ||||
|     monkeypatch.setenv('VLLM_USE_V1', '0') | ||||
| @ -1,113 +0,0 @@ | ||||
| # SPDX-License-Identifier: Apache-2.0 | ||||
| # SPDX-FileCopyrightText: Copyright contributors to the vLLM project | ||||
|  | ||||
| import os | ||||
| import subprocess | ||||
| import sys | ||||
| import time | ||||
| from multiprocessing import Pool | ||||
| from pathlib import Path | ||||
|  | ||||
| import pytest | ||||
| import requests | ||||
|  | ||||
|  | ||||
| def _query_server(prompt: str, max_tokens: int = 5) -> dict: | ||||
|     response = requests.post("http://localhost:8000/generate", | ||||
|                              json={ | ||||
|                                  "prompt": prompt, | ||||
|                                  "max_tokens": max_tokens, | ||||
|                                  "temperature": 0, | ||||
|                                  "ignore_eos": True | ||||
|                              }) | ||||
|     response.raise_for_status() | ||||
|     return response.json() | ||||
|  | ||||
|  | ||||
| def _query_server_long(prompt: str) -> dict: | ||||
|     return _query_server(prompt, max_tokens=500) | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def api_server(distributed_executor_backend: str): | ||||
|     script_path = Path(__file__).parent.joinpath( | ||||
|         "api_server_async_engine.py").absolute() | ||||
|     commands = [ | ||||
|         sys.executable, | ||||
|         "-u", | ||||
|         str(script_path), | ||||
|         "--model", | ||||
|         "facebook/opt-125m", | ||||
|         "--host", | ||||
|         "127.0.0.1", | ||||
|         "--distributed-executor-backend", | ||||
|         distributed_executor_backend, | ||||
|     ] | ||||
|  | ||||
|     # API Server Test Requires V0. | ||||
|     my_env = os.environ.copy() | ||||
|     my_env["VLLM_USE_V1"] = "0" | ||||
|     uvicorn_process = subprocess.Popen(commands, env=my_env) | ||||
|     yield | ||||
|     uvicorn_process.terminate() | ||||
|  | ||||
|  | ||||
| @pytest.mark.parametrize("distributed_executor_backend", ["mp", "ray"]) | ||||
| def test_api_server(api_server, distributed_executor_backend: str): | ||||
|     """ | ||||
|     Run the API server and test it. | ||||
|  | ||||
|     We run both the server and requests in separate processes. | ||||
|  | ||||
|     We test that the server can handle incoming requests, including | ||||
|     multiple requests at the same time, and that it can handle requests | ||||
|     being cancelled without crashing. | ||||
|     """ | ||||
|     with Pool(32) as pool: | ||||
|         # Wait until the server is ready | ||||
|         prompts = ["warm up"] * 1 | ||||
|         result = None | ||||
|         while not result: | ||||
|             try: | ||||
|                 for r in pool.map(_query_server, prompts): | ||||
|                     result = r | ||||
|                     break | ||||
|             except requests.exceptions.ConnectionError: | ||||
|                 time.sleep(1) | ||||
|  | ||||
|         # Actual tests start here | ||||
|         # Try with 1 prompt | ||||
|         for result in pool.map(_query_server, prompts): | ||||
|             assert result | ||||
|  | ||||
|         num_aborted_requests = requests.get( | ||||
|             "http://localhost:8000/stats").json()["num_aborted_requests"] | ||||
|         assert num_aborted_requests == 0 | ||||
|  | ||||
|         # Try with 100 prompts | ||||
|         prompts = ["test prompt"] * 100 | ||||
|         for result in pool.map(_query_server, prompts): | ||||
|             assert result | ||||
|  | ||||
|     with Pool(32) as pool: | ||||
|         # Cancel requests | ||||
|         prompts = ["canceled requests"] * 100 | ||||
|         pool.map_async(_query_server_long, prompts) | ||||
|         time.sleep(0.01) | ||||
|         pool.terminate() | ||||
|         pool.join() | ||||
|  | ||||
|         # check cancellation stats | ||||
|         # give it some times to update the stats | ||||
|         time.sleep(1) | ||||
|  | ||||
|         num_aborted_requests = requests.get( | ||||
|             "http://localhost:8000/stats").json()["num_aborted_requests"] | ||||
|         assert num_aborted_requests > 0 | ||||
|  | ||||
|     # check that server still runs after cancellations | ||||
|     with Pool(32) as pool: | ||||
|         # Try with 100 prompts | ||||
|         prompts = ["test prompt after canceled"] * 100 | ||||
|         for result in pool.map(_query_server, prompts): | ||||
|             assert result | ||||
| @ -1,71 +0,0 @@ | ||||
| # SPDX-License-Identifier: Apache-2.0 | ||||
| # SPDX-FileCopyrightText: Copyright contributors to the vLLM project | ||||
|  | ||||
| import pytest | ||||
|  | ||||
| from vllm.engine.async_llm_engine import RequestTracker | ||||
| from vllm.outputs import RequestOutput | ||||
|  | ||||
|  | ||||
| @pytest.mark.asyncio | ||||
| async def test_request_tracker(): | ||||
|     tracker = RequestTracker() | ||||
|     stream_1 = tracker.add_request("1") | ||||
|     assert tracker.new_requests_event.is_set() | ||||
|     await tracker.wait_for_new_requests() | ||||
|     new, aborted = tracker.get_new_and_aborted_requests() | ||||
|     assert not tracker.new_requests_event.is_set() | ||||
|     assert len(new) == 1 | ||||
|     assert new[0]["request_id"] == "1" | ||||
|     assert not aborted | ||||
|     assert not stream_1.finished | ||||
|  | ||||
|     stream_2 = tracker.add_request("2") | ||||
|     stream_3 = tracker.add_request("3") | ||||
|     assert tracker.new_requests_event.is_set() | ||||
|     await tracker.wait_for_new_requests() | ||||
|     new, aborted = tracker.get_new_and_aborted_requests() | ||||
|     assert not tracker.new_requests_event.is_set() | ||||
|     assert len(new) == 2 | ||||
|     assert new[0]["request_id"] == "2" | ||||
|     assert new[1]["request_id"] == "3" | ||||
|     assert not aborted | ||||
|     assert not stream_2.finished | ||||
|     assert not stream_3.finished | ||||
|  | ||||
|     # request_ids must be unique | ||||
|     with pytest.raises(KeyError): | ||||
|         tracker.add_request("1") | ||||
|     assert not tracker.new_requests_event.is_set() | ||||
|  | ||||
|     tracker.abort_request("1") | ||||
|     new, aborted = tracker.get_new_and_aborted_requests() | ||||
|     assert len(aborted) == 1 | ||||
|     assert "1" in aborted | ||||
|     assert not new | ||||
|     assert stream_1.finished | ||||
|  | ||||
|     stream_4 = tracker.add_request("4") | ||||
|     tracker.abort_request("4") | ||||
|     assert tracker.new_requests_event.is_set() | ||||
|     await tracker.wait_for_new_requests() | ||||
|     new, aborted = tracker.get_new_and_aborted_requests() | ||||
|     # aborted new requests will cancel each other out - | ||||
|     # there's no need for them to propagate into the | ||||
|     # engine | ||||
|     assert not aborted | ||||
|     assert not new | ||||
|     assert stream_4.finished | ||||
|  | ||||
|     stream_5 = tracker.add_request("5") | ||||
|     assert tracker.new_requests_event.is_set() | ||||
|     tracker.process_request_output( | ||||
|         RequestOutput("2", "output", [], [], [], finished=True)) | ||||
|     await tracker.wait_for_new_requests() | ||||
|     new, aborted = tracker.get_new_and_aborted_requests() | ||||
|     assert not tracker.new_requests_event.is_set() | ||||
|     assert not aborted | ||||
|     assert len(new) == 1 | ||||
|     assert new[0]["request_id"] == "5" | ||||
|     assert stream_2.finished | ||||
|     assert not stream_5.finished | ||||
		Reference in New Issue
	
	Block a user
	