mirror of
https://github.com/vllm-project/vllm.git
synced 2025-10-20 23:03:52 +08:00
1990 lines
69 KiB
Python
1990 lines
69 KiB
Python
# SPDX-License-Identifier: Apache-2.0
|
|
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
|
|
|
import asyncio
|
|
import gc
|
|
import hashlib
|
|
import importlib
|
|
import inspect
|
|
import json
|
|
import multiprocessing
|
|
import multiprocessing.forkserver as forkserver
|
|
import os
|
|
import secrets
|
|
import signal
|
|
import socket
|
|
import tempfile
|
|
import uuid
|
|
from argparse import Namespace
|
|
from collections.abc import AsyncGenerator, AsyncIterator, Awaitable, Callable
|
|
from contextlib import asynccontextmanager
|
|
from http import HTTPStatus
|
|
from typing import Annotated, Any, Literal
|
|
|
|
import prometheus_client
|
|
import pydantic
|
|
import regex as re
|
|
import uvloop
|
|
from fastapi import APIRouter, Depends, FastAPI, Form, HTTPException, Query, Request
|
|
from fastapi.exceptions import RequestValidationError
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import JSONResponse, Response, StreamingResponse
|
|
from prometheus_client import make_asgi_app
|
|
from prometheus_fastapi_instrumentator import Instrumentator
|
|
from starlette.concurrency import iterate_in_threadpool
|
|
from starlette.datastructures import URL, Headers, MutableHeaders, State
|
|
from starlette.routing import Mount
|
|
from starlette.types import ASGIApp, Message, Receive, Scope, Send
|
|
from typing_extensions import assert_never
|
|
|
|
import vllm.envs as envs
|
|
from vllm.config import VllmConfig
|
|
from vllm.engine.arg_utils import AsyncEngineArgs
|
|
from vllm.engine.protocol import EngineClient
|
|
from vllm.entrypoints.chat_utils import (
|
|
load_chat_template,
|
|
resolve_hf_chat_template,
|
|
resolve_mistral_chat_template,
|
|
)
|
|
from vllm.entrypoints.launcher import serve_http
|
|
from vllm.entrypoints.logger import RequestLogger
|
|
from vllm.entrypoints.openai.cli_args import make_arg_parser, validate_parsed_serve_args
|
|
from vllm.entrypoints.openai.protocol import (
|
|
ChatCompletionRequest,
|
|
ChatCompletionResponse,
|
|
ClassificationRequest,
|
|
ClassificationResponse,
|
|
CompletionRequest,
|
|
CompletionResponse,
|
|
DetokenizeRequest,
|
|
DetokenizeResponse,
|
|
EmbeddingRequest,
|
|
EmbeddingResponse,
|
|
ErrorInfo,
|
|
ErrorResponse,
|
|
IOProcessorResponse,
|
|
LoadLoRAAdapterRequest,
|
|
PoolingRequest,
|
|
PoolingResponse,
|
|
RerankRequest,
|
|
RerankResponse,
|
|
ResponsesRequest,
|
|
ResponsesResponse,
|
|
ScoreRequest,
|
|
ScoreResponse,
|
|
StreamingResponsesResponse,
|
|
TokenizeRequest,
|
|
TokenizeResponse,
|
|
TranscriptionRequest,
|
|
TranscriptionResponse,
|
|
TranslationRequest,
|
|
TranslationResponse,
|
|
UnloadLoRAAdapterRequest,
|
|
)
|
|
from vllm.entrypoints.openai.serving_chat import OpenAIServingChat
|
|
from vllm.entrypoints.openai.serving_classification import ServingClassification
|
|
from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion
|
|
from vllm.entrypoints.openai.serving_embedding import OpenAIServingEmbedding
|
|
from vllm.entrypoints.openai.serving_engine import OpenAIServing
|
|
from vllm.entrypoints.openai.serving_models import (
|
|
BaseModelPath,
|
|
LoRAModulePath,
|
|
OpenAIServingModels,
|
|
)
|
|
from vllm.entrypoints.openai.serving_pooling import OpenAIServingPooling
|
|
from vllm.entrypoints.openai.serving_responses import OpenAIServingResponses
|
|
from vllm.entrypoints.openai.serving_score import ServingScores
|
|
from vllm.entrypoints.openai.serving_tokenization import OpenAIServingTokenization
|
|
from vllm.entrypoints.openai.serving_transcription import (
|
|
OpenAIServingTranscription,
|
|
OpenAIServingTranslation,
|
|
)
|
|
from vllm.entrypoints.openai.tool_parsers import ToolParserManager
|
|
from vllm.entrypoints.tool_server import DemoToolServer, MCPToolServer, ToolServer
|
|
from vllm.entrypoints.utils import (
|
|
cli_env_setup,
|
|
load_aware_call,
|
|
log_non_default_args,
|
|
with_cancellation,
|
|
)
|
|
from vllm.logger import init_logger
|
|
from vllm.reasoning import ReasoningParserManager
|
|
from vllm.transformers_utils.tokenizer import MistralTokenizer
|
|
from vllm.usage.usage_lib import UsageContext
|
|
from vllm.utils import (
|
|
Device,
|
|
FlexibleArgumentParser,
|
|
decorate_logs,
|
|
is_valid_ipv6_address,
|
|
set_ulimit,
|
|
)
|
|
from vllm.v1.engine.exceptions import EngineDeadError
|
|
from vllm.v1.metrics.prometheus import get_prometheus_registry
|
|
from vllm.version import __version__ as VLLM_VERSION
|
|
|
|
prometheus_multiproc_dir: tempfile.TemporaryDirectory
|
|
|
|
# Cannot use __name__ (https://github.com/vllm-project/vllm/pull/4765)
|
|
logger = init_logger("vllm.entrypoints.openai.api_server")
|
|
|
|
_running_tasks: set[asyncio.Task] = set()
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
try:
|
|
if app.state.log_stats:
|
|
engine_client: EngineClient = app.state.engine_client
|
|
|
|
async def _force_log():
|
|
while True:
|
|
await asyncio.sleep(envs.VLLM_LOG_STATS_INTERVAL)
|
|
await engine_client.do_log_stats()
|
|
|
|
task = asyncio.create_task(_force_log())
|
|
_running_tasks.add(task)
|
|
task.add_done_callback(_running_tasks.remove)
|
|
else:
|
|
task = None
|
|
|
|
# Mark the startup heap as static so that it's ignored by GC.
|
|
# Reduces pause times of oldest generation collections.
|
|
gc.collect()
|
|
gc.freeze()
|
|
try:
|
|
yield
|
|
finally:
|
|
if task is not None:
|
|
task.cancel()
|
|
finally:
|
|
# Ensure app state including engine ref is gc'd
|
|
del app.state
|
|
|
|
|
|
@asynccontextmanager
|
|
async def build_async_engine_client(
|
|
args: Namespace,
|
|
*,
|
|
usage_context: UsageContext = UsageContext.OPENAI_API_SERVER,
|
|
disable_frontend_multiprocessing: bool | None = None,
|
|
client_config: dict[str, Any] | None = None,
|
|
) -> AsyncIterator[EngineClient]:
|
|
if os.getenv("VLLM_WORKER_MULTIPROC_METHOD") == "forkserver":
|
|
# The executor is expected to be mp.
|
|
# Pre-import heavy modules in the forkserver process
|
|
logger.debug("Setup forkserver with pre-imports")
|
|
multiprocessing.set_start_method("forkserver")
|
|
multiprocessing.set_forkserver_preload(["vllm.v1.engine.async_llm"])
|
|
forkserver.ensure_running()
|
|
logger.debug("Forkserver setup complete!")
|
|
|
|
# Context manager to handle engine_client lifecycle
|
|
# Ensures everything is shutdown and cleaned up on error/exit
|
|
engine_args = AsyncEngineArgs.from_cli_args(args)
|
|
if client_config:
|
|
engine_args._api_process_count = client_config.get("client_count", 1)
|
|
engine_args._api_process_rank = client_config.get("client_index", 0)
|
|
|
|
if disable_frontend_multiprocessing is None:
|
|
disable_frontend_multiprocessing = bool(args.disable_frontend_multiprocessing)
|
|
|
|
async with build_async_engine_client_from_engine_args(
|
|
engine_args,
|
|
usage_context=usage_context,
|
|
disable_frontend_multiprocessing=disable_frontend_multiprocessing,
|
|
client_config=client_config,
|
|
) as engine:
|
|
yield engine
|
|
|
|
|
|
@asynccontextmanager
|
|
async def build_async_engine_client_from_engine_args(
|
|
engine_args: AsyncEngineArgs,
|
|
*,
|
|
usage_context: UsageContext = UsageContext.OPENAI_API_SERVER,
|
|
disable_frontend_multiprocessing: bool = False,
|
|
client_config: dict[str, Any] | None = None,
|
|
) -> AsyncIterator[EngineClient]:
|
|
"""
|
|
Create EngineClient, either:
|
|
- in-process using the AsyncLLMEngine Directly
|
|
- multiprocess using AsyncLLMEngine RPC
|
|
|
|
Returns the Client or None if the creation failed.
|
|
"""
|
|
|
|
# Create the EngineConfig (determines if we can use V1).
|
|
vllm_config = engine_args.create_engine_config(usage_context=usage_context)
|
|
|
|
# V1 AsyncLLM.
|
|
assert envs.VLLM_USE_V1
|
|
|
|
if disable_frontend_multiprocessing:
|
|
logger.warning(
|
|
"V1 is enabled, but got --disable-frontend-multiprocessing. "
|
|
"To disable frontend multiprocessing, set VLLM_USE_V1=0."
|
|
)
|
|
|
|
from vllm.v1.engine.async_llm import AsyncLLM
|
|
|
|
async_llm: AsyncLLM | None = None
|
|
|
|
# Don't mutate the input client_config
|
|
client_config = dict(client_config) if client_config else {}
|
|
client_count = client_config.pop("client_count", 1)
|
|
client_index = client_config.pop("client_index", 0)
|
|
|
|
try:
|
|
async_llm = AsyncLLM.from_vllm_config(
|
|
vllm_config=vllm_config,
|
|
usage_context=usage_context,
|
|
enable_log_requests=engine_args.enable_log_requests,
|
|
aggregate_engine_logging=engine_args.aggregate_engine_logging,
|
|
disable_log_stats=engine_args.disable_log_stats,
|
|
client_addresses=client_config,
|
|
client_count=client_count,
|
|
client_index=client_index,
|
|
)
|
|
|
|
# Don't keep the dummy data in memory
|
|
await async_llm.reset_mm_cache()
|
|
|
|
yield async_llm
|
|
finally:
|
|
if async_llm:
|
|
async_llm.shutdown()
|
|
|
|
|
|
async def validate_json_request(raw_request: Request):
|
|
content_type = raw_request.headers.get("content-type", "").lower()
|
|
media_type = content_type.split(";", maxsplit=1)[0]
|
|
if media_type != "application/json":
|
|
raise RequestValidationError(
|
|
errors=["Unsupported Media Type: Only 'application/json' is allowed"]
|
|
)
|
|
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
class PrometheusResponse(Response):
|
|
media_type = prometheus_client.CONTENT_TYPE_LATEST
|
|
|
|
|
|
def mount_metrics(app: FastAPI):
|
|
"""Mount prometheus metrics to a FastAPI app."""
|
|
|
|
registry = get_prometheus_registry()
|
|
|
|
# `response_class=PrometheusResponse` is needed to return an HTTP response
|
|
# with header "Content-Type: text/plain; version=0.0.4; charset=utf-8"
|
|
# instead of the default "application/json" which is incorrect.
|
|
# See https://github.com/trallnag/prometheus-fastapi-instrumentator/issues/163#issue-1296092364
|
|
Instrumentator(
|
|
excluded_handlers=[
|
|
"/metrics",
|
|
"/health",
|
|
"/load",
|
|
"/ping",
|
|
"/version",
|
|
"/server_info",
|
|
],
|
|
registry=registry,
|
|
).add().instrument(app).expose(app, response_class=PrometheusResponse)
|
|
|
|
# Add prometheus asgi middleware to route /metrics requests
|
|
metrics_route = Mount("/metrics", make_asgi_app(registry=registry))
|
|
|
|
# Workaround for 307 Redirect for /metrics
|
|
metrics_route.path_regex = re.compile("^/metrics(?P<path>.*)$")
|
|
app.routes.append(metrics_route)
|
|
|
|
|
|
def base(request: Request) -> OpenAIServing:
|
|
# Reuse the existing instance
|
|
return tokenization(request)
|
|
|
|
|
|
def models(request: Request) -> OpenAIServingModels:
|
|
return request.app.state.openai_serving_models
|
|
|
|
|
|
def responses(request: Request) -> OpenAIServingResponses | None:
|
|
return request.app.state.openai_serving_responses
|
|
|
|
|
|
def chat(request: Request) -> OpenAIServingChat | None:
|
|
return request.app.state.openai_serving_chat
|
|
|
|
|
|
def completion(request: Request) -> OpenAIServingCompletion | None:
|
|
return request.app.state.openai_serving_completion
|
|
|
|
|
|
def pooling(request: Request) -> OpenAIServingPooling | None:
|
|
return request.app.state.openai_serving_pooling
|
|
|
|
|
|
def embedding(request: Request) -> OpenAIServingEmbedding | None:
|
|
return request.app.state.openai_serving_embedding
|
|
|
|
|
|
def score(request: Request) -> ServingScores | None:
|
|
return request.app.state.openai_serving_scores
|
|
|
|
|
|
def classify(request: Request) -> ServingClassification | None:
|
|
return request.app.state.openai_serving_classification
|
|
|
|
|
|
def rerank(request: Request) -> ServingScores | None:
|
|
return request.app.state.openai_serving_scores
|
|
|
|
|
|
def tokenization(request: Request) -> OpenAIServingTokenization:
|
|
return request.app.state.openai_serving_tokenization
|
|
|
|
|
|
def transcription(request: Request) -> OpenAIServingTranscription:
|
|
return request.app.state.openai_serving_transcription
|
|
|
|
|
|
def translation(request: Request) -> OpenAIServingTranslation:
|
|
return request.app.state.openai_serving_translation
|
|
|
|
|
|
def engine_client(request: Request) -> EngineClient:
|
|
return request.app.state.engine_client
|
|
|
|
|
|
@router.get("/health", response_class=Response)
|
|
async def health(raw_request: Request) -> Response:
|
|
"""Health check."""
|
|
try:
|
|
await engine_client(raw_request).check_health()
|
|
return Response(status_code=200)
|
|
except EngineDeadError:
|
|
return Response(status_code=503)
|
|
|
|
|
|
@router.get("/load")
|
|
async def get_server_load_metrics(request: Request):
|
|
# This endpoint returns the current server load metrics.
|
|
# It tracks requests utilizing the GPU from the following routes:
|
|
# - /v1/chat/completions
|
|
# - /v1/completions
|
|
# - /v1/audio/transcriptions
|
|
# - /v1/audio/translations
|
|
# - /v1/embeddings
|
|
# - /pooling
|
|
# - /classify
|
|
# - /score
|
|
# - /v1/score
|
|
# - /rerank
|
|
# - /v1/rerank
|
|
# - /v2/rerank
|
|
return JSONResponse(content={"server_load": request.app.state.server_load_metrics})
|
|
|
|
|
|
@router.get("/ping", response_class=Response)
|
|
@router.post("/ping", response_class=Response)
|
|
async def ping(raw_request: Request) -> Response:
|
|
"""Ping check. Endpoint required for SageMaker"""
|
|
return await health(raw_request)
|
|
|
|
|
|
@router.post(
|
|
"/tokenize",
|
|
dependencies=[Depends(validate_json_request)],
|
|
responses={
|
|
HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
|
|
HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse},
|
|
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
|
|
HTTPStatus.NOT_IMPLEMENTED.value: {"model": ErrorResponse},
|
|
},
|
|
)
|
|
@with_cancellation
|
|
async def tokenize(request: TokenizeRequest, raw_request: Request):
|
|
handler = tokenization(raw_request)
|
|
|
|
try:
|
|
generator = await handler.create_tokenize(request, raw_request)
|
|
except NotImplementedError as e:
|
|
raise HTTPException(
|
|
status_code=HTTPStatus.NOT_IMPLEMENTED.value, detail=str(e)
|
|
) from e
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e)
|
|
) from e
|
|
|
|
if isinstance(generator, ErrorResponse):
|
|
return JSONResponse(
|
|
content=generator.model_dump(), status_code=generator.error.code
|
|
)
|
|
elif isinstance(generator, TokenizeResponse):
|
|
return JSONResponse(content=generator.model_dump())
|
|
|
|
assert_never(generator)
|
|
|
|
|
|
@router.post(
|
|
"/detokenize",
|
|
dependencies=[Depends(validate_json_request)],
|
|
responses={
|
|
HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
|
|
HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse},
|
|
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
|
|
},
|
|
)
|
|
@with_cancellation
|
|
async def detokenize(request: DetokenizeRequest, raw_request: Request):
|
|
handler = tokenization(raw_request)
|
|
|
|
try:
|
|
generator = await handler.create_detokenize(request, raw_request)
|
|
except OverflowError as e:
|
|
raise RequestValidationError(errors=[str(e)]) from e
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e)
|
|
) from e
|
|
|
|
if isinstance(generator, ErrorResponse):
|
|
return JSONResponse(
|
|
content=generator.model_dump(), status_code=generator.error.code
|
|
)
|
|
elif isinstance(generator, DetokenizeResponse):
|
|
return JSONResponse(content=generator.model_dump())
|
|
|
|
assert_never(generator)
|
|
|
|
|
|
def maybe_register_tokenizer_info_endpoint(args):
|
|
"""Conditionally register the tokenizer info endpoint if enabled."""
|
|
if getattr(args, "enable_tokenizer_info_endpoint", False):
|
|
|
|
@router.get("/tokenizer_info")
|
|
async def get_tokenizer_info(raw_request: Request):
|
|
"""Get comprehensive tokenizer information."""
|
|
result = await tokenization(raw_request).get_tokenizer_info()
|
|
return JSONResponse(
|
|
content=result.model_dump(),
|
|
status_code=result.error.code
|
|
if isinstance(result, ErrorResponse)
|
|
else 200,
|
|
)
|
|
|
|
|
|
@router.get("/v1/models")
|
|
async def show_available_models(raw_request: Request):
|
|
handler = models(raw_request)
|
|
|
|
models_ = await handler.show_available_models()
|
|
return JSONResponse(content=models_.model_dump())
|
|
|
|
|
|
@router.get("/version")
|
|
async def show_version():
|
|
ver = {"version": VLLM_VERSION}
|
|
return JSONResponse(content=ver)
|
|
|
|
|
|
async def _convert_stream_to_sse_events(
|
|
generator: AsyncGenerator[StreamingResponsesResponse, None],
|
|
) -> AsyncGenerator[str, None]:
|
|
"""Convert the generator to a stream of events in SSE format"""
|
|
async for event in generator:
|
|
event_type = getattr(event, "type", "unknown")
|
|
# https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
|
|
event_data = (
|
|
f"event: {event_type}\ndata: {event.model_dump_json(indent=None)}\n\n"
|
|
)
|
|
yield event_data
|
|
|
|
|
|
@router.post(
|
|
"/v1/responses",
|
|
dependencies=[Depends(validate_json_request)],
|
|
responses={
|
|
HTTPStatus.OK.value: {"content": {"text/event-stream": {}}},
|
|
HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
|
|
HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse},
|
|
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
|
|
},
|
|
)
|
|
@with_cancellation
|
|
async def create_responses(request: ResponsesRequest, raw_request: Request):
|
|
handler = responses(raw_request)
|
|
if handler is None:
|
|
return base(raw_request).create_error_response(
|
|
message="The model does not support Responses API"
|
|
)
|
|
try:
|
|
generator = await handler.create_responses(request, raw_request)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e)
|
|
) from e
|
|
|
|
if isinstance(generator, ErrorResponse):
|
|
return JSONResponse(
|
|
content=generator.model_dump(), status_code=generator.error.code
|
|
)
|
|
elif isinstance(generator, ResponsesResponse):
|
|
return JSONResponse(content=generator.model_dump())
|
|
|
|
return StreamingResponse(
|
|
content=_convert_stream_to_sse_events(generator), media_type="text/event-stream"
|
|
)
|
|
|
|
|
|
@router.get("/v1/responses/{response_id}")
|
|
async def retrieve_responses(
|
|
response_id: str,
|
|
raw_request: Request,
|
|
starting_after: int | None = None,
|
|
stream: bool | None = False,
|
|
):
|
|
handler = responses(raw_request)
|
|
if handler is None:
|
|
return base(raw_request).create_error_response(
|
|
message="The model does not support Responses API"
|
|
)
|
|
|
|
try:
|
|
response = await handler.retrieve_responses(
|
|
response_id,
|
|
starting_after=starting_after,
|
|
stream=stream,
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e)
|
|
) from e
|
|
|
|
if isinstance(response, ErrorResponse):
|
|
return JSONResponse(
|
|
content=response.model_dump(), status_code=response.error.code
|
|
)
|
|
elif isinstance(response, ResponsesResponse):
|
|
return JSONResponse(content=response.model_dump())
|
|
return StreamingResponse(
|
|
content=_convert_stream_to_sse_events(response), media_type="text/event-stream"
|
|
)
|
|
|
|
|
|
@router.post("/v1/responses/{response_id}/cancel")
|
|
async def cancel_responses(response_id: str, raw_request: Request):
|
|
handler = responses(raw_request)
|
|
if handler is None:
|
|
return base(raw_request).create_error_response(
|
|
message="The model does not support Responses API"
|
|
)
|
|
|
|
try:
|
|
response = await handler.cancel_responses(response_id)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e)
|
|
) from e
|
|
|
|
if isinstance(response, ErrorResponse):
|
|
return JSONResponse(
|
|
content=response.model_dump(), status_code=response.error.code
|
|
)
|
|
return JSONResponse(content=response.model_dump())
|
|
|
|
|
|
@router.post(
|
|
"/v1/chat/completions",
|
|
dependencies=[Depends(validate_json_request)],
|
|
responses={
|
|
HTTPStatus.OK.value: {"content": {"text/event-stream": {}}},
|
|
HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
|
|
HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse},
|
|
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
|
|
},
|
|
)
|
|
@with_cancellation
|
|
@load_aware_call
|
|
async def create_chat_completion(request: ChatCompletionRequest, raw_request: Request):
|
|
handler = chat(raw_request)
|
|
if handler is None:
|
|
return base(raw_request).create_error_response(
|
|
message="The model does not support Chat Completions API"
|
|
)
|
|
try:
|
|
generator = await handler.create_chat_completion(request, raw_request)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e)
|
|
) from e
|
|
if isinstance(generator, ErrorResponse):
|
|
return JSONResponse(
|
|
content=generator.model_dump(), status_code=generator.error.code
|
|
)
|
|
|
|
elif isinstance(generator, ChatCompletionResponse):
|
|
return JSONResponse(content=generator.model_dump())
|
|
|
|
return StreamingResponse(content=generator, media_type="text/event-stream")
|
|
|
|
|
|
@router.post(
|
|
"/v1/completions",
|
|
dependencies=[Depends(validate_json_request)],
|
|
responses={
|
|
HTTPStatus.OK.value: {"content": {"text/event-stream": {}}},
|
|
HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
|
|
HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse},
|
|
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
|
|
},
|
|
)
|
|
@with_cancellation
|
|
@load_aware_call
|
|
async def create_completion(request: CompletionRequest, raw_request: Request):
|
|
handler = completion(raw_request)
|
|
if handler is None:
|
|
return base(raw_request).create_error_response(
|
|
message="The model does not support Completions API"
|
|
)
|
|
|
|
try:
|
|
generator = await handler.create_completion(request, raw_request)
|
|
except OverflowError as e:
|
|
raise HTTPException(
|
|
status_code=HTTPStatus.BAD_REQUEST.value, detail=str(e)
|
|
) from e
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e)
|
|
) from e
|
|
|
|
if isinstance(generator, ErrorResponse):
|
|
return JSONResponse(
|
|
content=generator.model_dump(), status_code=generator.error.code
|
|
)
|
|
elif isinstance(generator, CompletionResponse):
|
|
return JSONResponse(content=generator.model_dump())
|
|
|
|
return StreamingResponse(content=generator, media_type="text/event-stream")
|
|
|
|
|
|
@router.post(
|
|
"/v1/embeddings",
|
|
dependencies=[Depends(validate_json_request)],
|
|
responses={
|
|
HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
|
|
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
|
|
},
|
|
)
|
|
@with_cancellation
|
|
@load_aware_call
|
|
async def create_embedding(request: EmbeddingRequest, raw_request: Request):
|
|
handler = embedding(raw_request)
|
|
if handler is None:
|
|
return base(raw_request).create_error_response(
|
|
message="The model does not support Embeddings API"
|
|
)
|
|
|
|
try:
|
|
generator = await handler.create_embedding(request, raw_request)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e)
|
|
) from e
|
|
|
|
if isinstance(generator, ErrorResponse):
|
|
return JSONResponse(
|
|
content=generator.model_dump(), status_code=generator.error.code
|
|
)
|
|
elif isinstance(generator, EmbeddingResponse):
|
|
return JSONResponse(content=generator.model_dump())
|
|
|
|
assert_never(generator)
|
|
|
|
|
|
@router.post(
|
|
"/pooling",
|
|
dependencies=[Depends(validate_json_request)],
|
|
responses={
|
|
HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
|
|
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
|
|
},
|
|
)
|
|
@with_cancellation
|
|
@load_aware_call
|
|
async def create_pooling(request: PoolingRequest, raw_request: Request):
|
|
handler = pooling(raw_request)
|
|
if handler is None:
|
|
return base(raw_request).create_error_response(
|
|
message="The model does not support Pooling API"
|
|
)
|
|
try:
|
|
generator = await handler.create_pooling(request, raw_request)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e)
|
|
) from e
|
|
if isinstance(generator, ErrorResponse):
|
|
return JSONResponse(
|
|
content=generator.model_dump(), status_code=generator.error.code
|
|
)
|
|
elif isinstance(generator, (PoolingResponse, IOProcessorResponse)):
|
|
return JSONResponse(content=generator.model_dump())
|
|
|
|
assert_never(generator)
|
|
|
|
|
|
@router.post("/classify", dependencies=[Depends(validate_json_request)])
|
|
@with_cancellation
|
|
@load_aware_call
|
|
async def create_classify(request: ClassificationRequest, raw_request: Request):
|
|
handler = classify(raw_request)
|
|
if handler is None:
|
|
return base(raw_request).create_error_response(
|
|
message="The model does not support Classification API"
|
|
)
|
|
|
|
try:
|
|
generator = await handler.create_classify(request, raw_request)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e)
|
|
) from e
|
|
if isinstance(generator, ErrorResponse):
|
|
return JSONResponse(
|
|
content=generator.model_dump(), status_code=generator.error.code
|
|
)
|
|
|
|
elif isinstance(generator, ClassificationResponse):
|
|
return JSONResponse(content=generator.model_dump())
|
|
|
|
assert_never(generator)
|
|
|
|
|
|
@router.post(
|
|
"/score",
|
|
dependencies=[Depends(validate_json_request)],
|
|
responses={
|
|
HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
|
|
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
|
|
},
|
|
)
|
|
@with_cancellation
|
|
@load_aware_call
|
|
async def create_score(request: ScoreRequest, raw_request: Request):
|
|
handler = score(raw_request)
|
|
if handler is None:
|
|
return base(raw_request).create_error_response(
|
|
message="The model does not support Score API"
|
|
)
|
|
|
|
try:
|
|
generator = await handler.create_score(request, raw_request)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e)
|
|
) from e
|
|
if isinstance(generator, ErrorResponse):
|
|
return JSONResponse(
|
|
content=generator.model_dump(), status_code=generator.error.code
|
|
)
|
|
elif isinstance(generator, ScoreResponse):
|
|
return JSONResponse(content=generator.model_dump())
|
|
|
|
assert_never(generator)
|
|
|
|
|
|
@router.post(
|
|
"/v1/score",
|
|
dependencies=[Depends(validate_json_request)],
|
|
responses={
|
|
HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
|
|
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
|
|
},
|
|
)
|
|
@with_cancellation
|
|
@load_aware_call
|
|
async def create_score_v1(request: ScoreRequest, raw_request: Request):
|
|
logger.warning(
|
|
"To indicate that Score API is not part of standard OpenAI API, we "
|
|
"have moved it to `/score`. Please update your client accordingly."
|
|
)
|
|
|
|
return await create_score(request, raw_request)
|
|
|
|
|
|
@router.post(
|
|
"/v1/audio/transcriptions",
|
|
responses={
|
|
HTTPStatus.OK.value: {"content": {"text/event-stream": {}}},
|
|
HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
|
|
HTTPStatus.UNPROCESSABLE_ENTITY.value: {"model": ErrorResponse},
|
|
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
|
|
},
|
|
)
|
|
@with_cancellation
|
|
@load_aware_call
|
|
async def create_transcriptions(
|
|
raw_request: Request, request: Annotated[TranscriptionRequest, Form()]
|
|
):
|
|
handler = transcription(raw_request)
|
|
if handler is None:
|
|
return base(raw_request).create_error_response(
|
|
message="The model does not support Transcriptions API"
|
|
)
|
|
|
|
audio_data = await request.file.read()
|
|
try:
|
|
generator = await handler.create_transcription(audio_data, request, raw_request)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e)
|
|
) from e
|
|
|
|
if isinstance(generator, ErrorResponse):
|
|
return JSONResponse(
|
|
content=generator.model_dump(), status_code=generator.error.code
|
|
)
|
|
|
|
elif isinstance(generator, TranscriptionResponse):
|
|
return JSONResponse(content=generator.model_dump())
|
|
|
|
return StreamingResponse(content=generator, media_type="text/event-stream")
|
|
|
|
|
|
@router.post(
|
|
"/v1/audio/translations",
|
|
responses={
|
|
HTTPStatus.OK.value: {"content": {"text/event-stream": {}}},
|
|
HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
|
|
HTTPStatus.UNPROCESSABLE_ENTITY.value: {"model": ErrorResponse},
|
|
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
|
|
},
|
|
)
|
|
@with_cancellation
|
|
@load_aware_call
|
|
async def create_translations(
|
|
request: Annotated[TranslationRequest, Form()], raw_request: Request
|
|
):
|
|
handler = translation(raw_request)
|
|
if handler is None:
|
|
return base(raw_request).create_error_response(
|
|
message="The model does not support Translations API"
|
|
)
|
|
|
|
audio_data = await request.file.read()
|
|
try:
|
|
generator = await handler.create_translation(audio_data, request, raw_request)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e)
|
|
) from e
|
|
|
|
if isinstance(generator, ErrorResponse):
|
|
return JSONResponse(
|
|
content=generator.model_dump(), status_code=generator.error.code
|
|
)
|
|
|
|
elif isinstance(generator, TranslationResponse):
|
|
return JSONResponse(content=generator.model_dump())
|
|
|
|
return StreamingResponse(content=generator, media_type="text/event-stream")
|
|
|
|
|
|
@router.post(
|
|
"/rerank",
|
|
dependencies=[Depends(validate_json_request)],
|
|
responses={
|
|
HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
|
|
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
|
|
},
|
|
)
|
|
@with_cancellation
|
|
@load_aware_call
|
|
async def do_rerank(request: RerankRequest, raw_request: Request):
|
|
handler = rerank(raw_request)
|
|
if handler is None:
|
|
return base(raw_request).create_error_response(
|
|
message="The model does not support Rerank (Score) API"
|
|
)
|
|
try:
|
|
generator = await handler.do_rerank(request, raw_request)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e)
|
|
) from e
|
|
if isinstance(generator, ErrorResponse):
|
|
return JSONResponse(
|
|
content=generator.model_dump(), status_code=generator.error.code
|
|
)
|
|
elif isinstance(generator, RerankResponse):
|
|
return JSONResponse(content=generator.model_dump())
|
|
|
|
assert_never(generator)
|
|
|
|
|
|
@router.post(
|
|
"/v1/rerank",
|
|
dependencies=[Depends(validate_json_request)],
|
|
responses={
|
|
HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
|
|
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
|
|
},
|
|
)
|
|
@with_cancellation
|
|
async def do_rerank_v1(request: RerankRequest, raw_request: Request):
|
|
logger.warning_once(
|
|
"To indicate that the rerank API is not part of the standard OpenAI"
|
|
" API, we have located it at `/rerank`. Please update your client "
|
|
"accordingly. (Note: Conforms to JinaAI rerank API)"
|
|
)
|
|
|
|
return await do_rerank(request, raw_request)
|
|
|
|
|
|
@router.post(
|
|
"/v2/rerank",
|
|
dependencies=[Depends(validate_json_request)],
|
|
responses={
|
|
HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
|
|
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
|
|
},
|
|
)
|
|
@with_cancellation
|
|
async def do_rerank_v2(request: RerankRequest, raw_request: Request):
|
|
return await do_rerank(request, raw_request)
|
|
|
|
|
|
if envs.VLLM_SERVER_DEV_MODE:
|
|
logger.warning(
|
|
"SECURITY WARNING: Development endpoints are enabled! "
|
|
"This should NOT be used in production!"
|
|
)
|
|
|
|
PydanticVllmConfig = pydantic.TypeAdapter(VllmConfig)
|
|
|
|
@router.get("/server_info")
|
|
async def show_server_info(
|
|
raw_request: Request,
|
|
config_format: Annotated[Literal["text", "json"], Query()] = "text",
|
|
):
|
|
vllm_config: VllmConfig = raw_request.app.state.vllm_config
|
|
server_info = {
|
|
"vllm_config": str(vllm_config)
|
|
if config_format == "text"
|
|
else PydanticVllmConfig.dump_python(vllm_config, mode="json", fallback=str)
|
|
# fallback=str is needed to handle e.g. torch.dtype
|
|
}
|
|
return JSONResponse(content=server_info)
|
|
|
|
@router.post("/reset_prefix_cache")
|
|
async def reset_prefix_cache(raw_request: Request):
|
|
"""
|
|
Reset the prefix cache. Note that we currently do not check if the
|
|
prefix cache is successfully reset in the API server.
|
|
"""
|
|
device = None
|
|
device_str = raw_request.query_params.get("device")
|
|
if device_str is not None:
|
|
device = Device[device_str.upper()]
|
|
logger.info("Resetting prefix cache with specific %s...", str(device))
|
|
await engine_client(raw_request).reset_prefix_cache(device)
|
|
return Response(status_code=200)
|
|
|
|
@router.post("/sleep")
|
|
async def sleep(raw_request: Request):
|
|
# get POST params
|
|
level = raw_request.query_params.get("level", "1")
|
|
await engine_client(raw_request).sleep(int(level))
|
|
# FIXME: in v0 with frontend multiprocessing, the sleep command
|
|
# is sent but does not finish yet when we return a response.
|
|
return Response(status_code=200)
|
|
|
|
@router.post("/wake_up")
|
|
async def wake_up(raw_request: Request):
|
|
tags = raw_request.query_params.getlist("tags")
|
|
if tags == []:
|
|
# set to None to wake up all tags if no tags are provided
|
|
tags = None
|
|
logger.info("wake up the engine with tags: %s", tags)
|
|
await engine_client(raw_request).wake_up(tags)
|
|
# FIXME: in v0 with frontend multiprocessing, the wake-up command
|
|
# is sent but does not finish yet when we return a response.
|
|
return Response(status_code=200)
|
|
|
|
@router.get("/is_sleeping")
|
|
async def is_sleeping(raw_request: Request):
|
|
logger.info("check whether the engine is sleeping")
|
|
is_sleeping = await engine_client(raw_request).is_sleeping()
|
|
return JSONResponse(content={"is_sleeping": is_sleeping})
|
|
|
|
@router.post("/collective_rpc")
|
|
async def collective_rpc(raw_request: Request):
|
|
try:
|
|
body = await raw_request.json()
|
|
except json.JSONDecodeError as e:
|
|
raise HTTPException(
|
|
status_code=HTTPStatus.BAD_REQUEST.value,
|
|
detail=f"JSON decode error: {e}",
|
|
) from e
|
|
method = body.get("method")
|
|
if method is None:
|
|
raise HTTPException(
|
|
status_code=HTTPStatus.BAD_REQUEST.value,
|
|
detail="Missing 'method' in request body",
|
|
)
|
|
# For security reason, only serialized string args/kwargs are passed.
|
|
# User-defined `method` is responsible for deserialization if needed.
|
|
args: list[str] = body.get("args", [])
|
|
kwargs: dict[str, str] = body.get("kwargs", {})
|
|
timeout: float | None = body.get("timeout")
|
|
results = await engine_client(raw_request).collective_rpc(
|
|
method=method, timeout=timeout, args=tuple(args), kwargs=kwargs
|
|
)
|
|
if results is None:
|
|
return Response(status_code=200)
|
|
response: list[Any] = []
|
|
for result in results:
|
|
if result is None or isinstance(result, (dict, list)):
|
|
response.append(result)
|
|
else:
|
|
response.append(str(result))
|
|
return JSONResponse(content={"results": response})
|
|
|
|
|
|
@router.post(
|
|
"/scale_elastic_ep",
|
|
dependencies=[Depends(validate_json_request)],
|
|
responses={
|
|
HTTPStatus.OK.value: {"model": dict},
|
|
HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
|
|
HTTPStatus.REQUEST_TIMEOUT.value: {"model": ErrorResponse},
|
|
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
|
|
},
|
|
)
|
|
async def scale_elastic_ep(raw_request: Request):
|
|
try:
|
|
body = await raw_request.json()
|
|
except json.JSONDecodeError as e:
|
|
raise HTTPException(status_code=400, detail="Invalid JSON format") from e # noqa: B904
|
|
|
|
new_data_parallel_size = body.get("new_data_parallel_size")
|
|
drain_timeout = body.get("drain_timeout", 120) # Default 2 minutes
|
|
|
|
if new_data_parallel_size is None:
|
|
raise HTTPException(
|
|
status_code=400, detail="new_data_parallel_size is required"
|
|
)
|
|
|
|
if not isinstance(new_data_parallel_size, int) or new_data_parallel_size <= 0:
|
|
raise HTTPException(
|
|
status_code=400, detail="new_data_parallel_size must be a positive integer"
|
|
)
|
|
|
|
if not isinstance(drain_timeout, int) or drain_timeout <= 0:
|
|
raise HTTPException(
|
|
status_code=400, detail="drain_timeout must be a positive integer"
|
|
)
|
|
|
|
# Set scaling flag to prevent new requests
|
|
global _scaling_elastic_ep
|
|
_scaling_elastic_ep = True
|
|
client = engine_client(raw_request)
|
|
try:
|
|
await client.scale_elastic_ep(new_data_parallel_size, drain_timeout)
|
|
return JSONResponse(
|
|
{
|
|
"message": f"Scaled to {new_data_parallel_size} data parallel engines",
|
|
}
|
|
)
|
|
except TimeoutError as e:
|
|
raise HTTPException(
|
|
status_code=408,
|
|
detail="Scale failed due to request drain timeout "
|
|
f"after {drain_timeout} seconds",
|
|
) from e
|
|
except Exception as e:
|
|
logger.error("Scale failed: %s", e)
|
|
raise HTTPException(status_code=500, detail="Scale failed") from e
|
|
finally:
|
|
_scaling_elastic_ep = False
|
|
|
|
|
|
@router.post("/is_scaling_elastic_ep")
|
|
async def is_scaling_elastic_ep(raw_request: Request):
|
|
return JSONResponse({"is_scaling_elastic_ep": _scaling_elastic_ep})
|
|
|
|
|
|
# TODO: RequestType = TypeForm[BaseModel] when recognized by type checkers
|
|
# (requires typing_extensions >= 4.13)
|
|
RequestType = Any
|
|
GetHandlerFn = Callable[[Request], OpenAIServing | None]
|
|
EndpointFn = Callable[[RequestType, Request], Awaitable[Any]]
|
|
|
|
# NOTE: Items defined earlier take higher priority
|
|
INVOCATION_TYPES: list[tuple[RequestType, tuple[GetHandlerFn, EndpointFn]]] = [
|
|
(ChatCompletionRequest, (chat, create_chat_completion)),
|
|
(CompletionRequest, (completion, create_completion)),
|
|
(EmbeddingRequest, (embedding, create_embedding)),
|
|
(ClassificationRequest, (classify, create_classify)),
|
|
(ScoreRequest, (score, create_score)),
|
|
(RerankRequest, (rerank, do_rerank)),
|
|
(PoolingRequest, (pooling, create_pooling)),
|
|
]
|
|
|
|
# NOTE: Construct the TypeAdapters only once
|
|
INVOCATION_VALIDATORS = [
|
|
(pydantic.TypeAdapter(request_type), (get_handler, endpoint))
|
|
for request_type, (get_handler, endpoint) in INVOCATION_TYPES
|
|
]
|
|
|
|
|
|
@router.post(
|
|
"/invocations",
|
|
dependencies=[Depends(validate_json_request)],
|
|
responses={
|
|
HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
|
|
HTTPStatus.UNSUPPORTED_MEDIA_TYPE.value: {"model": ErrorResponse},
|
|
HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
|
|
},
|
|
)
|
|
async def invocations(raw_request: Request):
|
|
"""For SageMaker, routes requests based on the request type."""
|
|
try:
|
|
body = await raw_request.json()
|
|
except json.JSONDecodeError as e:
|
|
raise HTTPException(
|
|
status_code=HTTPStatus.BAD_REQUEST.value, detail=f"JSON decode error: {e}"
|
|
) from e
|
|
|
|
valid_endpoints = [
|
|
(validator, endpoint)
|
|
for validator, (get_handler, endpoint) in INVOCATION_VALIDATORS
|
|
if get_handler(raw_request) is not None
|
|
]
|
|
|
|
for request_validator, endpoint in valid_endpoints:
|
|
try:
|
|
request = request_validator.validate_python(body)
|
|
except pydantic.ValidationError:
|
|
continue
|
|
|
|
return await endpoint(request, raw_request)
|
|
|
|
type_names = [
|
|
t.__name__ if isinstance(t := validator._type, type) else str(t)
|
|
for validator, _ in valid_endpoints
|
|
]
|
|
msg = f"Cannot find suitable handler for request. Expected one of: {type_names}"
|
|
res = base(raw_request).create_error_response(message=msg)
|
|
return JSONResponse(content=res.model_dump(), status_code=res.error.code)
|
|
|
|
|
|
if envs.VLLM_TORCH_PROFILER_DIR:
|
|
logger.warning(
|
|
"Torch Profiler is enabled in the API server. This should ONLY be "
|
|
"used for local development!"
|
|
)
|
|
|
|
@router.post("/start_profile")
|
|
async def start_profile(raw_request: Request):
|
|
logger.info("Starting profiler...")
|
|
await engine_client(raw_request).start_profile()
|
|
logger.info("Profiler started.")
|
|
return Response(status_code=200)
|
|
|
|
@router.post("/stop_profile")
|
|
async def stop_profile(raw_request: Request):
|
|
logger.info("Stopping profiler...")
|
|
await engine_client(raw_request).stop_profile()
|
|
logger.info("Profiler stopped.")
|
|
return Response(status_code=200)
|
|
|
|
|
|
if envs.VLLM_ALLOW_RUNTIME_LORA_UPDATING:
|
|
logger.warning(
|
|
"LoRA dynamic loading & unloading is enabled in the API server. "
|
|
"This should ONLY be used for local development!"
|
|
)
|
|
|
|
@router.post("/v1/load_lora_adapter", dependencies=[Depends(validate_json_request)])
|
|
async def load_lora_adapter(request: LoadLoRAAdapterRequest, raw_request: Request):
|
|
handler = models(raw_request)
|
|
response = await handler.load_lora_adapter(request)
|
|
if isinstance(response, ErrorResponse):
|
|
return JSONResponse(
|
|
content=response.model_dump(), status_code=response.error.code
|
|
)
|
|
|
|
return Response(status_code=200, content=response)
|
|
|
|
@router.post(
|
|
"/v1/unload_lora_adapter", dependencies=[Depends(validate_json_request)]
|
|
)
|
|
async def unload_lora_adapter(
|
|
request: UnloadLoRAAdapterRequest, raw_request: Request
|
|
):
|
|
handler = models(raw_request)
|
|
response = await handler.unload_lora_adapter(request)
|
|
if isinstance(response, ErrorResponse):
|
|
return JSONResponse(
|
|
content=response.model_dump(), status_code=response.error.code
|
|
)
|
|
|
|
return Response(status_code=200, content=response)
|
|
|
|
|
|
def load_log_config(log_config_file: str | None) -> dict | None:
|
|
if not log_config_file:
|
|
return None
|
|
try:
|
|
with open(log_config_file) as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
logger.warning(
|
|
"Failed to load log config from file %s: error %s", log_config_file, e
|
|
)
|
|
return None
|
|
|
|
|
|
class AuthenticationMiddleware:
|
|
"""
|
|
Pure ASGI middleware that authenticates each request by checking
|
|
if the Authorization Bearer token exists and equals anyof "{api_key}".
|
|
|
|
Notes
|
|
-----
|
|
There are two cases in which authentication is skipped:
|
|
1. The HTTP method is OPTIONS.
|
|
2. The request path doesn't start with /v1 (e.g. /health).
|
|
"""
|
|
|
|
def __init__(self, app: ASGIApp, tokens: list[str]) -> None:
|
|
self.app = app
|
|
self.api_tokens = [hashlib.sha256(t.encode("utf-8")).digest() for t in tokens]
|
|
|
|
def verify_token(self, headers: Headers) -> bool:
|
|
authorization_header_value = headers.get("Authorization")
|
|
if not authorization_header_value:
|
|
return False
|
|
|
|
scheme, _, param = authorization_header_value.partition(" ")
|
|
if scheme.lower() != "bearer":
|
|
return False
|
|
|
|
param_hash = hashlib.sha256(param.encode("utf-8")).digest()
|
|
|
|
token_match = False
|
|
for token_hash in self.api_tokens:
|
|
token_match |= secrets.compare_digest(param_hash, token_hash)
|
|
|
|
return token_match
|
|
|
|
def __call__(self, scope: Scope, receive: Receive, send: Send) -> Awaitable[None]:
|
|
if scope["type"] not in ("http", "websocket") or scope["method"] == "OPTIONS":
|
|
# scope["type"] can be "lifespan" or "startup" for example,
|
|
# in which case we don't need to do anything
|
|
return self.app(scope, receive, send)
|
|
root_path = scope.get("root_path", "")
|
|
url_path = URL(scope=scope).path.removeprefix(root_path)
|
|
headers = Headers(scope=scope)
|
|
# Type narrow to satisfy mypy.
|
|
if url_path.startswith("/v1") and not self.verify_token(headers):
|
|
response = JSONResponse(content={"error": "Unauthorized"}, status_code=401)
|
|
return response(scope, receive, send)
|
|
return self.app(scope, receive, send)
|
|
|
|
|
|
class XRequestIdMiddleware:
|
|
"""
|
|
Middleware the set's the X-Request-Id header for each response
|
|
to a random uuid4 (hex) value if the header isn't already
|
|
present in the request, otherwise use the provided request id.
|
|
"""
|
|
|
|
def __init__(self, app: ASGIApp) -> None:
|
|
self.app = app
|
|
|
|
def __call__(self, scope: Scope, receive: Receive, send: Send) -> Awaitable[None]:
|
|
if scope["type"] not in ("http", "websocket"):
|
|
return self.app(scope, receive, send)
|
|
|
|
# Extract the request headers.
|
|
request_headers = Headers(scope=scope)
|
|
|
|
async def send_with_request_id(message: Message) -> None:
|
|
"""
|
|
Custom send function to mutate the response headers
|
|
and append X-Request-Id to it.
|
|
"""
|
|
if message["type"] == "http.response.start":
|
|
response_headers = MutableHeaders(raw=message["headers"])
|
|
request_id = request_headers.get("X-Request-Id", uuid.uuid4().hex)
|
|
response_headers.append("X-Request-Id", request_id)
|
|
await send(message)
|
|
|
|
return self.app(scope, receive, send_with_request_id)
|
|
|
|
|
|
# Global variable to track scaling state
|
|
_scaling_elastic_ep = False
|
|
|
|
|
|
class ScalingMiddleware:
|
|
"""
|
|
Middleware that checks if the model is currently scaling and
|
|
returns a 503 Service Unavailable response if it is.
|
|
|
|
This middleware applies to all HTTP requests and prevents
|
|
processing when the model is in a scaling state.
|
|
"""
|
|
|
|
def __init__(self, app: ASGIApp) -> None:
|
|
self.app = app
|
|
|
|
def __call__(self, scope: Scope, receive: Receive, send: Send) -> Awaitable[None]:
|
|
if scope["type"] != "http":
|
|
return self.app(scope, receive, send)
|
|
|
|
# Check global scaling state
|
|
global _scaling_elastic_ep
|
|
if _scaling_elastic_ep:
|
|
# Return 503 Service Unavailable response
|
|
response = JSONResponse(
|
|
content={
|
|
"error": "The model is currently scaling. Please try again later."
|
|
},
|
|
status_code=503,
|
|
)
|
|
return response(scope, receive, send)
|
|
|
|
return self.app(scope, receive, send)
|
|
|
|
|
|
def _extract_content_from_chunk(chunk_data: dict) -> str:
|
|
"""Extract content from a streaming response chunk."""
|
|
try:
|
|
from vllm.entrypoints.openai.protocol import (
|
|
ChatCompletionStreamResponse,
|
|
CompletionStreamResponse,
|
|
)
|
|
|
|
# Try using Completion types for type-safe parsing
|
|
if chunk_data.get("object") == "chat.completion.chunk":
|
|
chat_response = ChatCompletionStreamResponse.model_validate(chunk_data)
|
|
if chat_response.choices and chat_response.choices[0].delta.content:
|
|
return chat_response.choices[0].delta.content
|
|
elif chunk_data.get("object") == "text_completion":
|
|
completion_response = CompletionStreamResponse.model_validate(chunk_data)
|
|
if completion_response.choices and completion_response.choices[0].text:
|
|
return completion_response.choices[0].text
|
|
except pydantic.ValidationError:
|
|
# Fallback to manual parsing
|
|
if "choices" in chunk_data and chunk_data["choices"]:
|
|
choice = chunk_data["choices"][0]
|
|
if "delta" in choice and choice["delta"].get("content"):
|
|
return choice["delta"]["content"]
|
|
elif choice.get("text"):
|
|
return choice["text"]
|
|
return ""
|
|
|
|
|
|
class SSEDecoder:
|
|
"""Robust Server-Sent Events decoder for streaming responses."""
|
|
|
|
def __init__(self):
|
|
self.buffer = ""
|
|
self.content_buffer = []
|
|
|
|
def decode_chunk(self, chunk: bytes) -> list[dict]:
|
|
"""Decode a chunk of SSE data and return parsed events."""
|
|
import json
|
|
|
|
try:
|
|
chunk_str = chunk.decode("utf-8")
|
|
except UnicodeDecodeError:
|
|
# Skip malformed chunks
|
|
return []
|
|
|
|
self.buffer += chunk_str
|
|
events = []
|
|
|
|
# Process complete lines
|
|
while "\n" in self.buffer:
|
|
line, self.buffer = self.buffer.split("\n", 1)
|
|
line = line.rstrip("\r") # Handle CRLF
|
|
|
|
if line.startswith("data: "):
|
|
data_str = line[6:].strip()
|
|
if data_str == "[DONE]":
|
|
events.append({"type": "done"})
|
|
elif data_str:
|
|
try:
|
|
event_data = json.loads(data_str)
|
|
events.append({"type": "data", "data": event_data})
|
|
except json.JSONDecodeError:
|
|
# Skip malformed JSON
|
|
continue
|
|
|
|
return events
|
|
|
|
def extract_content(self, event_data: dict) -> str:
|
|
"""Extract content from event data."""
|
|
return _extract_content_from_chunk(event_data)
|
|
|
|
def add_content(self, content: str) -> None:
|
|
"""Add content to the buffer."""
|
|
if content:
|
|
self.content_buffer.append(content)
|
|
|
|
def get_complete_content(self) -> str:
|
|
"""Get the complete buffered content."""
|
|
return "".join(self.content_buffer)
|
|
|
|
|
|
def _log_streaming_response(response, response_body: list) -> None:
|
|
"""Log streaming response with robust SSE parsing."""
|
|
from starlette.concurrency import iterate_in_threadpool
|
|
|
|
sse_decoder = SSEDecoder()
|
|
chunk_count = 0
|
|
|
|
def buffered_iterator():
|
|
nonlocal chunk_count
|
|
|
|
for chunk in response_body:
|
|
chunk_count += 1
|
|
yield chunk
|
|
|
|
# Parse SSE events from chunk
|
|
events = sse_decoder.decode_chunk(chunk)
|
|
|
|
for event in events:
|
|
if event["type"] == "data":
|
|
content = sse_decoder.extract_content(event["data"])
|
|
sse_decoder.add_content(content)
|
|
elif event["type"] == "done":
|
|
# Log complete content when done
|
|
full_content = sse_decoder.get_complete_content()
|
|
if full_content:
|
|
# Truncate if too long
|
|
if len(full_content) > 2048:
|
|
full_content = full_content[:2048] + ""
|
|
"...[truncated]"
|
|
logger.info(
|
|
"response_body={streaming_complete: "
|
|
"content='%s', chunks=%d}",
|
|
full_content,
|
|
chunk_count,
|
|
)
|
|
else:
|
|
logger.info(
|
|
"response_body={streaming_complete: no_content, chunks=%d}",
|
|
chunk_count,
|
|
)
|
|
return
|
|
|
|
response.body_iterator = iterate_in_threadpool(buffered_iterator())
|
|
logger.info("response_body={streaming_started: chunks=%d}", len(response_body))
|
|
|
|
|
|
def _log_non_streaming_response(response_body: list) -> None:
|
|
"""Log non-streaming response."""
|
|
try:
|
|
decoded_body = response_body[0].decode()
|
|
logger.info("response_body={%s}", decoded_body)
|
|
except UnicodeDecodeError:
|
|
logger.info("response_body={<binary_data>}")
|
|
|
|
|
|
def build_app(args: Namespace) -> FastAPI:
|
|
if args.disable_fastapi_docs:
|
|
app = FastAPI(
|
|
openapi_url=None, docs_url=None, redoc_url=None, lifespan=lifespan
|
|
)
|
|
else:
|
|
app = FastAPI(lifespan=lifespan)
|
|
app.include_router(router)
|
|
app.root_path = args.root_path
|
|
|
|
mount_metrics(app)
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=args.allowed_origins,
|
|
allow_credentials=args.allow_credentials,
|
|
allow_methods=args.allowed_methods,
|
|
allow_headers=args.allowed_headers,
|
|
)
|
|
|
|
@app.exception_handler(HTTPException)
|
|
async def http_exception_handler(_: Request, exc: HTTPException):
|
|
err = ErrorResponse(
|
|
error=ErrorInfo(
|
|
message=exc.detail,
|
|
type=HTTPStatus(exc.status_code).phrase,
|
|
code=exc.status_code,
|
|
)
|
|
)
|
|
return JSONResponse(err.model_dump(), status_code=exc.status_code)
|
|
|
|
@app.exception_handler(RequestValidationError)
|
|
async def validation_exception_handler(_: Request, exc: RequestValidationError):
|
|
exc_str = str(exc)
|
|
errors_str = str(exc.errors())
|
|
|
|
if exc.errors() and errors_str and errors_str != exc_str:
|
|
message = f"{exc_str} {errors_str}"
|
|
else:
|
|
message = exc_str
|
|
|
|
err = ErrorResponse(
|
|
error=ErrorInfo(
|
|
message=message,
|
|
type=HTTPStatus.BAD_REQUEST.phrase,
|
|
code=HTTPStatus.BAD_REQUEST,
|
|
)
|
|
)
|
|
return JSONResponse(err.model_dump(), status_code=HTTPStatus.BAD_REQUEST)
|
|
|
|
# Ensure --api-key option from CLI takes precedence over VLLM_API_KEY
|
|
if tokens := [key for key in (args.api_key or [envs.VLLM_API_KEY]) if key]:
|
|
app.add_middleware(AuthenticationMiddleware, tokens=tokens)
|
|
|
|
if args.enable_request_id_headers:
|
|
app.add_middleware(XRequestIdMiddleware)
|
|
|
|
# Add scaling middleware to check for scaling state
|
|
app.add_middleware(ScalingMiddleware)
|
|
|
|
if envs.VLLM_DEBUG_LOG_API_SERVER_RESPONSE:
|
|
logger.warning(
|
|
"CAUTION: Enabling log response in the API Server. "
|
|
"This can include sensitive information and should be "
|
|
"avoided in production."
|
|
)
|
|
|
|
@app.middleware("http")
|
|
async def log_response(request: Request, call_next):
|
|
response = await call_next(request)
|
|
response_body = [section async for section in response.body_iterator]
|
|
response.body_iterator = iterate_in_threadpool(iter(response_body))
|
|
# Check if this is a streaming response by looking at content-type
|
|
content_type = response.headers.get("content-type", "")
|
|
is_streaming = content_type == "text/event-stream; charset=utf-8"
|
|
|
|
# Log response body based on type
|
|
if not response_body:
|
|
logger.info("response_body={<empty>}")
|
|
elif is_streaming:
|
|
_log_streaming_response(response, response_body)
|
|
else:
|
|
_log_non_streaming_response(response_body)
|
|
return response
|
|
|
|
for middleware in args.middleware:
|
|
module_path, object_name = middleware.rsplit(".", 1)
|
|
imported = getattr(importlib.import_module(module_path), object_name)
|
|
if inspect.isclass(imported):
|
|
app.add_middleware(imported) # type: ignore[arg-type]
|
|
elif inspect.iscoroutinefunction(imported):
|
|
app.middleware("http")(imported)
|
|
else:
|
|
raise ValueError(
|
|
f"Invalid middleware {middleware}. Must be a function or a class."
|
|
)
|
|
|
|
return app
|
|
|
|
|
|
async def init_app_state(
|
|
engine_client: EngineClient,
|
|
state: State,
|
|
args: Namespace,
|
|
) -> None:
|
|
vllm_config = engine_client.vllm_config
|
|
|
|
if args.served_model_name is not None:
|
|
served_model_names = args.served_model_name
|
|
else:
|
|
served_model_names = [args.model]
|
|
|
|
if args.enable_log_requests:
|
|
request_logger = RequestLogger(max_log_len=args.max_log_len)
|
|
else:
|
|
request_logger = None
|
|
|
|
base_model_paths = [
|
|
BaseModelPath(name=name, model_path=args.model) for name in served_model_names
|
|
]
|
|
|
|
state.engine_client = engine_client
|
|
state.log_stats = not args.disable_log_stats
|
|
state.vllm_config = vllm_config
|
|
|
|
supported_tasks = await engine_client.get_supported_tasks()
|
|
logger.info("Supported tasks: %s", supported_tasks)
|
|
|
|
resolved_chat_template = load_chat_template(args.chat_template)
|
|
if resolved_chat_template is not None:
|
|
# Get the tokenizer to check official template
|
|
tokenizer = await engine_client.get_tokenizer()
|
|
|
|
if isinstance(tokenizer, MistralTokenizer):
|
|
# The warning is logged in resolve_mistral_chat_template.
|
|
resolved_chat_template = resolve_mistral_chat_template(
|
|
chat_template=resolved_chat_template
|
|
)
|
|
else:
|
|
hf_chat_template = resolve_hf_chat_template(
|
|
tokenizer=tokenizer,
|
|
chat_template=None,
|
|
tools=None,
|
|
model_config=vllm_config.model_config,
|
|
)
|
|
|
|
if hf_chat_template != resolved_chat_template:
|
|
logger.warning(
|
|
"Using supplied chat template: %s\n"
|
|
"It is different from official chat template '%s'. "
|
|
"This discrepancy may lead to performance degradation.",
|
|
resolved_chat_template,
|
|
args.model,
|
|
)
|
|
|
|
if args.tool_server == "demo":
|
|
tool_server: ToolServer | None = DemoToolServer()
|
|
assert isinstance(tool_server, DemoToolServer)
|
|
await tool_server.init_and_validate()
|
|
elif args.tool_server:
|
|
tool_server = MCPToolServer()
|
|
await tool_server.add_tool_server(args.tool_server)
|
|
else:
|
|
tool_server = None
|
|
|
|
# Merge default_mm_loras into the static lora_modules
|
|
default_mm_loras = (
|
|
vllm_config.lora_config.default_mm_loras
|
|
if vllm_config.lora_config is not None
|
|
else {}
|
|
)
|
|
|
|
lora_modules = args.lora_modules
|
|
if default_mm_loras:
|
|
default_mm_lora_paths = [
|
|
LoRAModulePath(
|
|
name=modality,
|
|
path=lora_path,
|
|
)
|
|
for modality, lora_path in default_mm_loras.items()
|
|
]
|
|
if args.lora_modules is None:
|
|
lora_modules = default_mm_lora_paths
|
|
else:
|
|
lora_modules += default_mm_lora_paths
|
|
|
|
state.openai_serving_models = OpenAIServingModels(
|
|
engine_client=engine_client,
|
|
base_model_paths=base_model_paths,
|
|
lora_modules=lora_modules,
|
|
)
|
|
await state.openai_serving_models.init_static_loras()
|
|
state.openai_serving_responses = (
|
|
OpenAIServingResponses(
|
|
engine_client,
|
|
state.openai_serving_models,
|
|
request_logger=request_logger,
|
|
chat_template=resolved_chat_template,
|
|
chat_template_content_format=args.chat_template_content_format,
|
|
return_tokens_as_token_ids=args.return_tokens_as_token_ids,
|
|
enable_auto_tools=args.enable_auto_tool_choice,
|
|
tool_parser=args.tool_call_parser,
|
|
tool_server=tool_server,
|
|
reasoning_parser=args.structured_outputs_config.reasoning_parser,
|
|
enable_prompt_tokens_details=args.enable_prompt_tokens_details,
|
|
enable_force_include_usage=args.enable_force_include_usage,
|
|
enable_log_outputs=args.enable_log_outputs,
|
|
log_error_stack=args.log_error_stack,
|
|
)
|
|
if "generate" in supported_tasks
|
|
else None
|
|
)
|
|
state.openai_serving_chat = (
|
|
OpenAIServingChat(
|
|
engine_client,
|
|
state.openai_serving_models,
|
|
args.response_role,
|
|
request_logger=request_logger,
|
|
chat_template=resolved_chat_template,
|
|
chat_template_content_format=args.chat_template_content_format,
|
|
trust_request_chat_template=args.trust_request_chat_template,
|
|
return_tokens_as_token_ids=args.return_tokens_as_token_ids,
|
|
enable_auto_tools=args.enable_auto_tool_choice,
|
|
exclude_tools_when_tool_choice_none=args.exclude_tools_when_tool_choice_none,
|
|
tool_parser=args.tool_call_parser,
|
|
reasoning_parser=args.structured_outputs_config.reasoning_parser,
|
|
enable_prompt_tokens_details=args.enable_prompt_tokens_details,
|
|
enable_force_include_usage=args.enable_force_include_usage,
|
|
enable_log_outputs=args.enable_log_outputs,
|
|
log_error_stack=args.log_error_stack,
|
|
)
|
|
if "generate" in supported_tasks
|
|
else None
|
|
)
|
|
state.openai_serving_completion = (
|
|
OpenAIServingCompletion(
|
|
engine_client,
|
|
state.openai_serving_models,
|
|
request_logger=request_logger,
|
|
return_tokens_as_token_ids=args.return_tokens_as_token_ids,
|
|
enable_prompt_tokens_details=args.enable_prompt_tokens_details,
|
|
enable_force_include_usage=args.enable_force_include_usage,
|
|
log_error_stack=args.log_error_stack,
|
|
)
|
|
if "generate" in supported_tasks
|
|
else None
|
|
)
|
|
state.openai_serving_pooling = (
|
|
(
|
|
OpenAIServingPooling(
|
|
engine_client,
|
|
state.openai_serving_models,
|
|
supported_tasks=supported_tasks,
|
|
request_logger=request_logger,
|
|
chat_template=resolved_chat_template,
|
|
chat_template_content_format=args.chat_template_content_format,
|
|
trust_request_chat_template=args.trust_request_chat_template,
|
|
log_error_stack=args.log_error_stack,
|
|
)
|
|
)
|
|
if ("token_embed" in supported_tasks or "token_classify" in supported_tasks)
|
|
else None
|
|
)
|
|
state.openai_serving_embedding = (
|
|
OpenAIServingEmbedding(
|
|
engine_client,
|
|
state.openai_serving_models,
|
|
request_logger=request_logger,
|
|
chat_template=resolved_chat_template,
|
|
chat_template_content_format=args.chat_template_content_format,
|
|
trust_request_chat_template=args.trust_request_chat_template,
|
|
log_error_stack=args.log_error_stack,
|
|
)
|
|
if "embed" in supported_tasks
|
|
else None
|
|
)
|
|
state.openai_serving_classification = (
|
|
ServingClassification(
|
|
engine_client,
|
|
state.openai_serving_models,
|
|
request_logger=request_logger,
|
|
log_error_stack=args.log_error_stack,
|
|
)
|
|
if "classify" in supported_tasks
|
|
else None
|
|
)
|
|
state.openai_serving_scores = (
|
|
ServingScores(
|
|
engine_client,
|
|
state.openai_serving_models,
|
|
request_logger=request_logger,
|
|
log_error_stack=args.log_error_stack,
|
|
)
|
|
if ("embed" in supported_tasks or "score" in supported_tasks)
|
|
else None
|
|
)
|
|
state.openai_serving_tokenization = OpenAIServingTokenization(
|
|
engine_client,
|
|
state.openai_serving_models,
|
|
request_logger=request_logger,
|
|
chat_template=resolved_chat_template,
|
|
chat_template_content_format=args.chat_template_content_format,
|
|
trust_request_chat_template=args.trust_request_chat_template,
|
|
log_error_stack=args.log_error_stack,
|
|
)
|
|
state.openai_serving_transcription = (
|
|
OpenAIServingTranscription(
|
|
engine_client,
|
|
state.openai_serving_models,
|
|
request_logger=request_logger,
|
|
log_error_stack=args.log_error_stack,
|
|
enable_force_include_usage=args.enable_force_include_usage,
|
|
)
|
|
if "transcription" in supported_tasks
|
|
else None
|
|
)
|
|
state.openai_serving_translation = (
|
|
OpenAIServingTranslation(
|
|
engine_client,
|
|
state.openai_serving_models,
|
|
request_logger=request_logger,
|
|
log_error_stack=args.log_error_stack,
|
|
enable_force_include_usage=args.enable_force_include_usage,
|
|
)
|
|
if "transcription" in supported_tasks
|
|
else None
|
|
)
|
|
|
|
state.enable_server_load_tracking = args.enable_server_load_tracking
|
|
state.server_load_metrics = 0
|
|
|
|
|
|
def create_server_socket(addr: tuple[str, int]) -> socket.socket:
|
|
family = socket.AF_INET
|
|
if is_valid_ipv6_address(addr[0]):
|
|
family = socket.AF_INET6
|
|
|
|
sock = socket.socket(family=family, type=socket.SOCK_STREAM)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
|
|
sock.bind(addr)
|
|
|
|
return sock
|
|
|
|
|
|
def create_server_unix_socket(path: str) -> socket.socket:
|
|
sock = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
|
|
sock.bind(path)
|
|
return sock
|
|
|
|
|
|
def validate_api_server_args(args):
|
|
valid_tool_parses = ToolParserManager.tool_parsers.keys()
|
|
if args.enable_auto_tool_choice and args.tool_call_parser not in valid_tool_parses:
|
|
raise KeyError(
|
|
f"invalid tool call parser: {args.tool_call_parser} "
|
|
f"(chose from {{ {','.join(valid_tool_parses)} }})"
|
|
)
|
|
|
|
valid_reasoning_parses = ReasoningParserManager.reasoning_parsers.keys()
|
|
if (
|
|
reasoning_parser := args.structured_outputs_config.reasoning_parser
|
|
) and reasoning_parser not in valid_reasoning_parses:
|
|
raise KeyError(
|
|
f"invalid reasoning parser: {reasoning_parser} "
|
|
f"(chose from {{ {','.join(valid_reasoning_parses)} }})"
|
|
)
|
|
|
|
|
|
def setup_server(args):
|
|
"""Validate API server args, set up signal handler, create socket
|
|
ready to serve."""
|
|
|
|
logger.info("vLLM API server version %s", VLLM_VERSION)
|
|
log_non_default_args(args)
|
|
|
|
if args.tool_parser_plugin and len(args.tool_parser_plugin) > 3:
|
|
ToolParserManager.import_tool_parser(args.tool_parser_plugin)
|
|
|
|
validate_api_server_args(args)
|
|
|
|
# workaround to make sure that we bind the port before the engine is set up.
|
|
# This avoids race conditions with ray.
|
|
# see https://github.com/vllm-project/vllm/issues/8204
|
|
if args.uds:
|
|
sock = create_server_unix_socket(args.uds)
|
|
else:
|
|
sock_addr = (args.host or "", args.port)
|
|
sock = create_server_socket(sock_addr)
|
|
|
|
# workaround to avoid footguns where uvicorn drops requests with too
|
|
# many concurrent requests active
|
|
set_ulimit()
|
|
|
|
def signal_handler(*_) -> None:
|
|
# Interrupt server on sigterm while initializing
|
|
raise KeyboardInterrupt("terminated")
|
|
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
if args.uds:
|
|
listen_address = f"unix:{args.uds}"
|
|
else:
|
|
addr, port = sock_addr
|
|
is_ssl = args.ssl_keyfile and args.ssl_certfile
|
|
host_part = f"[{addr}]" if is_valid_ipv6_address(addr) else addr or "0.0.0.0"
|
|
listen_address = f"http{'s' if is_ssl else ''}://{host_part}:{port}"
|
|
return listen_address, sock
|
|
|
|
|
|
async def run_server(args, **uvicorn_kwargs) -> None:
|
|
"""Run a single-worker API server."""
|
|
|
|
# Add process-specific prefix to stdout and stderr.
|
|
decorate_logs("APIServer")
|
|
|
|
listen_address, sock = setup_server(args)
|
|
await run_server_worker(listen_address, sock, args, **uvicorn_kwargs)
|
|
|
|
|
|
async def run_server_worker(
|
|
listen_address, sock, args, client_config=None, **uvicorn_kwargs
|
|
) -> None:
|
|
"""Run a single API server worker."""
|
|
|
|
if args.tool_parser_plugin and len(args.tool_parser_plugin) > 3:
|
|
ToolParserManager.import_tool_parser(args.tool_parser_plugin)
|
|
|
|
# Load logging config for uvicorn if specified
|
|
log_config = load_log_config(args.log_config_file)
|
|
if log_config is not None:
|
|
uvicorn_kwargs["log_config"] = log_config
|
|
|
|
async with build_async_engine_client(
|
|
args,
|
|
client_config=client_config,
|
|
) as engine_client:
|
|
maybe_register_tokenizer_info_endpoint(args)
|
|
app = build_app(args)
|
|
|
|
await init_app_state(engine_client, app.state, args)
|
|
|
|
logger.info(
|
|
"Starting vLLM API server %d on %s",
|
|
engine_client.vllm_config.parallel_config._api_process_rank,
|
|
listen_address,
|
|
)
|
|
shutdown_task = await serve_http(
|
|
app,
|
|
sock=sock,
|
|
enable_ssl_refresh=args.enable_ssl_refresh,
|
|
host=args.host,
|
|
port=args.port,
|
|
log_level=args.uvicorn_log_level,
|
|
# NOTE: When the 'disable_uvicorn_access_log' value is True,
|
|
# no access log will be output.
|
|
access_log=not args.disable_uvicorn_access_log,
|
|
timeout_keep_alive=envs.VLLM_HTTP_TIMEOUT_KEEP_ALIVE,
|
|
ssl_keyfile=args.ssl_keyfile,
|
|
ssl_certfile=args.ssl_certfile,
|
|
ssl_ca_certs=args.ssl_ca_certs,
|
|
ssl_cert_reqs=args.ssl_cert_reqs,
|
|
h11_max_incomplete_event_size=args.h11_max_incomplete_event_size,
|
|
h11_max_header_count=args.h11_max_header_count,
|
|
**uvicorn_kwargs,
|
|
)
|
|
|
|
# NB: Await server shutdown only after the backend context is exited
|
|
try:
|
|
await shutdown_task
|
|
finally:
|
|
sock.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# NOTE(simon):
|
|
# This section should be in sync with vllm/entrypoints/cli/main.py for CLI
|
|
# entrypoints.
|
|
cli_env_setup()
|
|
parser = FlexibleArgumentParser(
|
|
description="vLLM OpenAI-Compatible RESTful API server."
|
|
)
|
|
parser = make_arg_parser(parser)
|
|
args = parser.parse_args()
|
|
validate_parsed_serve_args(args)
|
|
|
|
uvloop.run(run_server(args))
|