mirror of
				https://github.com/vllm-project/vllm.git
				synced 2025-10-31 14:24:37 +08:00 
			
		
		
		
	Compare commits
	
		
			3 Commits
		
	
	
		
			v0.10.0rc1
			...
			add-utils
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 7d092fc32c | |||
| 1a6c27f271 | |||
| 3c6fd286b4 | 
							
								
								
									
										362
									
								
								benchmarks/benchmark_one_concurrent.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										362
									
								
								benchmarks/benchmark_one_concurrent.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,362 @@ | ||||
| # SPDX-License-Identifier: Apache-2.0 | ||||
| import argparse | ||||
| import asyncio | ||||
| import logging | ||||
| import random | ||||
| import time | ||||
| from dataclasses import dataclass | ||||
| from typing import Optional | ||||
|  | ||||
| import aiohttp  # Import aiohttp | ||||
| import numpy as np | ||||
| from tqdm import tqdm | ||||
|  | ||||
| from backend_request_func import RequestFuncInput, RequestFuncOutput | ||||
| from benchmark_dataset import RandomDataset, SampleRequest | ||||
|  | ||||
| try: | ||||
|     from vllm.transformers_utils.tokenizer import get_tokenizer | ||||
| except ImportError: | ||||
|     from backend_request_func import get_tokenizer | ||||
|  | ||||
| logger = logging.getLogger(__name__) | ||||
|  | ||||
|  | ||||
| @dataclass | ||||
| class BenchmarkMetrics: | ||||
|     completed: int | ||||
|     total_input: int | ||||
|     total_output: int | ||||
|     mean_ttft_ms: float | ||||
|     median_ttft_ms: float | ||||
|     std_ttft_ms: float | ||||
|     percentiles_ttft_ms: list[tuple[float, float]] | ||||
|     mean_itl_ms: float | ||||
|     median_itl_ms: float | ||||
|     std_itl_ms: float | ||||
|     percentiles_itl_ms: list[tuple[float, float]] | ||||
|     mean_e2el_ms: float | ||||
|     median_e2el_ms: float | ||||
|     std_e2el_ms: float | ||||
|     percentiles_e2el_ms: list[tuple[float, float]] | ||||
|  | ||||
|  | ||||
| async def reset_cache(reset_url: str): | ||||
|     """Sends a POST request to reset the prefix cache.""" | ||||
|     logger.debug("Resetting prefix cache at %s", reset_url) | ||||
|     try: | ||||
|         async with ( | ||||
|             aiohttp.ClientSession() as session, | ||||
|             session.post(reset_url) as response, | ||||
|         ): | ||||
|             response.raise_for_status()  # Raise an exception for bad status codes (4xx or 5xx) | ||||
|             logger.debug("Prefix cache reset successful: %s", response.status) | ||||
|     except aiohttp.ClientConnectorError as e: | ||||
|         logger.error("Failed to connect to cache reset endpoint %s: %s}", reset_url, e) | ||||
|     except aiohttp.ClientResponseError as e: | ||||
|         logger.error( | ||||
|             "Cache reset request failed with status %s: %s", e.status, e.message | ||||
|         ) | ||||
|     except Exception as e: | ||||
|         logger.error("An unexpected error occurred during cache reset: %s", e) | ||||
|  | ||||
|  | ||||
| async def sequential_benchmark( | ||||
|     backend: str, | ||||
|     api_url: str, | ||||
|     model_id: str, | ||||
|     tokenizer, | ||||
|     input_requests: list[SampleRequest], | ||||
|     request_func, | ||||
|     selected_percentiles: list[float], | ||||
|     cache_reset_url: Optional[str] = None, | ||||
| ): | ||||
|     """ | ||||
|     Benchmark that processes requests sequentially, waiting for each to complete | ||||
|     before starting the next one. Resets prefix cache between requests. | ||||
|     """ | ||||
|     outputs = [] | ||||
|  | ||||
|     pbar = tqdm(total=len(input_requests)) | ||||
|  | ||||
|     benchmark_start_time = time.perf_counter() | ||||
|  | ||||
|     # Process requests sequentially | ||||
|     for request in input_requests: | ||||
|         prompt, prompt_len, output_len = ( | ||||
|             request.prompt, | ||||
|             request.prompt_len, | ||||
|             request.expected_output_len, | ||||
|         ) | ||||
|  | ||||
|         logger.info("Sending request with len %s", request.prompt_len) | ||||
|         logger.debug('Request str: "%s"', request.prompt[:50]) | ||||
|         request_start_time = time.perf_counter() | ||||
|  | ||||
|         request_func_input = RequestFuncInput( | ||||
|             model=model_id, | ||||
|             prompt=prompt, | ||||
|             api_url=api_url, | ||||
|             prompt_len=prompt_len, | ||||
|             output_len=output_len, | ||||
|         ) | ||||
|  | ||||
|         output = await request_func(request_func_input=request_func_input) | ||||
|  | ||||
|         request_end_time = time.perf_counter() | ||||
|         # Add timing information | ||||
|         if output.success and not hasattr(output, "latency"): | ||||
|             output.latency = request_end_time - request_start_time | ||||
|         logger.info("Finished request with latency %.4f s", output.latency) | ||||
|  | ||||
|         outputs.append(output) | ||||
|         pbar.update(1) | ||||
|  | ||||
|     pbar.close() | ||||
|  | ||||
|     benchmark_duration = time.perf_counter() - benchmark_start_time | ||||
|  | ||||
|     # Calculate metrics | ||||
|     metrics = calculate_metrics( | ||||
|         input_requests=input_requests, | ||||
|         outputs=outputs, | ||||
|         dur_s=benchmark_duration, | ||||
|         tokenizer=tokenizer, | ||||
|         selected_percentiles=selected_percentiles, | ||||
|     ) | ||||
|  | ||||
|     print_results(metrics, benchmark_duration) | ||||
|  | ||||
|     result = { | ||||
|         "duration": benchmark_duration, | ||||
|         "completed": metrics.completed, | ||||
|         "total_input_tokens": metrics.total_input, | ||||
|         "total_output_tokens": metrics.total_output, | ||||
|         "input_lens": [request.prompt_len for request in input_requests], | ||||
|         "output_lens": [ | ||||
|             output.output_tokens if output.success else 0 for output in outputs | ||||
|         ], | ||||
|         "ttfts": [output.ttft for output in outputs if output.success], | ||||
|         "itls": [output.itl for output in outputs if output.success], | ||||
|         "generated_texts": [ | ||||
|             output.generated_text for output in outputs if output.success | ||||
|         ], | ||||
|         "errors": [output.error for output in outputs if not output.success], | ||||
|     } | ||||
|  | ||||
|     # Add summary statistics | ||||
|     for stat_name in ["ttft", "itl", "e2el"]: | ||||
|         for metric_name in ["mean", "median", "std"]: | ||||
|             result[f"{metric_name}_{stat_name}_ms"] = getattr( | ||||
|                 metrics, f"{metric_name}_{stat_name}_ms" | ||||
|             ) | ||||
|  | ||||
|         for p, value in getattr(metrics, f"percentiles_{stat_name}_ms"): | ||||
|             p_word = str(int(p)) if int(p) == p else str(p) | ||||
|             result[f"p{p_word}_{stat_name}_ms"] = value | ||||
|  | ||||
|     return result | ||||
|  | ||||
|  | ||||
| def calculate_metrics( | ||||
|     input_requests: list[SampleRequest], | ||||
|     outputs: list[RequestFuncOutput], | ||||
|     dur_s: float, | ||||
|     tokenizer, | ||||
|     selected_percentiles: list[float], | ||||
| ) -> BenchmarkMetrics: | ||||
|     """Calculate benchmark metrics from results.""" | ||||
|     total_input = 0 | ||||
|     completed = 0 | ||||
|     total_output = 0 | ||||
|     ttfts = [] | ||||
|     itls = [] | ||||
|     e2els = [] | ||||
|  | ||||
|     for i, output in enumerate(outputs): | ||||
|         if output.success: | ||||
|             output_len = output.output_tokens | ||||
|  | ||||
|             if not output_len: | ||||
|                 # Use tokenizer to count output tokens if not provided | ||||
|                 output_len = len( | ||||
|                     tokenizer(output.generated_text, add_special_tokens=False).input_ids | ||||
|                 ) | ||||
|  | ||||
|             total_output += output_len | ||||
|             total_input += input_requests[i].prompt_len | ||||
|  | ||||
|             if hasattr(output, "ttft") and output.ttft is not None: | ||||
|                 ttfts.append(output.ttft) | ||||
|  | ||||
|             if hasattr(output, "itl") and output.itl: | ||||
|                 # Ensure itl is a list of floats | ||||
|                 if isinstance(output.itl, list): | ||||
|                     itls.extend(output.itl) | ||||
|                 else: | ||||
|                     logger.warning( | ||||
|                         "Expected list for ITL but got %s. Appending as is.", | ||||
|                         type(output.itl), | ||||
|                     ) | ||||
|                     itls.append(output.itl) | ||||
|  | ||||
|             if hasattr(output, "latency") and output.latency is not None: | ||||
|                 e2els.append(output.latency) | ||||
|  | ||||
|             completed += 1 | ||||
|  | ||||
|     return BenchmarkMetrics( | ||||
|         completed=completed, | ||||
|         total_input=total_input, | ||||
|         total_output=total_output, | ||||
|         mean_ttft_ms=np.mean(ttfts or [0]) * 1000, | ||||
|         median_ttft_ms=np.median(ttfts or [0]) * 1000, | ||||
|         std_ttft_ms=np.std(ttfts or [0]) * 1000, | ||||
|         percentiles_ttft_ms=[ | ||||
|             (p, np.percentile(ttfts or [0], p) * 1000) for p in selected_percentiles | ||||
|         ], | ||||
|         mean_itl_ms=np.mean(itls or [0]) * 1000, | ||||
|         median_itl_ms=np.median(itls or [0]) * 1000, | ||||
|         std_itl_ms=np.std(itls or [0]) * 1000, | ||||
|         percentiles_itl_ms=[ | ||||
|             (p, np.percentile(itls or [0], p) * 1000) for p in selected_percentiles | ||||
|         ], | ||||
|         mean_e2el_ms=np.mean(e2els or [0]) * 1000, | ||||
|         median_e2el_ms=np.median(e2els or [0]) * 1000, | ||||
|         std_e2el_ms=np.std(e2els or [0]) * 1000, | ||||
|         percentiles_e2el_ms=[ | ||||
|             (p, np.percentile(e2els or [0], p) * 1000) for p in selected_percentiles | ||||
|         ], | ||||
|     ) | ||||
|  | ||||
|  | ||||
| def print_results(metrics: BenchmarkMetrics, benchmark_duration: float): | ||||
|     """Print benchmark results in a formatted way.""" | ||||
|     print("{s:{c}^{n}}".format(s=" Sequential Benchmark Result ", n=60, c="=")) | ||||
|     print("{:<40} {:<10}".format("Successful requests:", metrics.completed)) | ||||
|     print("{:<40} {:<10.2f}".format("Benchmark duration (s):", benchmark_duration)) | ||||
|     print("{:<40} {:<10}".format("Total input tokens:", metrics.total_input)) | ||||
|     print("{:<40} {:<10}".format("Total generated tokens:", metrics.total_output)) | ||||
|  | ||||
|     def print_metric_stats(metric_name, header): | ||||
|         print("{s:{c}^{n}}".format(s=header, n=60, c="-")) | ||||
|         print( | ||||
|             "{:<40} {:<10.2f}".format( | ||||
|                 f"Mean {metric_name} (ms):", | ||||
|                 getattr(metrics, f"mean_{metric_name.lower()}_ms"), | ||||
|             ) | ||||
|         ) | ||||
|         print( | ||||
|             "{:<40} {:<10.2f}".format( | ||||
|                 f"Median {metric_name} (ms):", | ||||
|                 getattr(metrics, f"median_{metric_name.lower()}_ms"), | ||||
|             ) | ||||
|         ) | ||||
|  | ||||
|         for p, value in getattr(metrics, f"percentiles_{metric_name.lower()}_ms"): | ||||
|             p_word = str(int(p)) if int(p) == p else str(p) | ||||
|             print("{:<40} {:<10.2f}".format(f"P{p_word} {metric_name} (ms):", value)) | ||||
|  | ||||
|     print_metric_stats("TTFT", "Time to First Token") | ||||
|     print_metric_stats("ITL", "Inter-token Latency") | ||||
|     print_metric_stats("E2EL", "End-to-end Latency") | ||||
|     print("=" * 60) | ||||
|  | ||||
|  | ||||
| async def main_async(args): | ||||
|     # Import needed functions based on your setup | ||||
|     from backend_request_func import ASYNC_REQUEST_FUNCS | ||||
|  | ||||
|     backend = args.backend | ||||
|     model_id = args.model | ||||
|     tokenizer_id = args.tokenizer if args.tokenizer is not None else args.model | ||||
|  | ||||
|     # Set up API URL | ||||
|     if args.base_url is not None: | ||||
|         api_url = f"{args.base_url}{args.endpoint}" | ||||
|     else: | ||||
|         api_url = f"http://{args.host}:{args.port}{args.endpoint}" | ||||
|  | ||||
|     # Set up Cache Reset URL | ||||
|     cache_reset_url = f"http://{args.host}:{args.port}/reset_prefix_cache" | ||||
|     logger.info("Prefix cache reset configured at: %s", cache_reset_url) | ||||
|  | ||||
|     # Get tokenizer | ||||
|     tokenizer = get_tokenizer(tokenizer_id, trust_remote_code=args.trust_remote_code) | ||||
|  | ||||
|     # Get request function | ||||
|     if backend in ASYNC_REQUEST_FUNCS: | ||||
|         request_func = ASYNC_REQUEST_FUNCS[backend] | ||||
|     else: | ||||
|         raise ValueError(f"Unknown backend: {backend}") | ||||
|  | ||||
|     input_requests = RandomDataset().sample( | ||||
|         tokenizer=tokenizer, | ||||
|         num_requests=args.num_requests, | ||||
|         prefix_len=0, | ||||
|         input_len=args.input_len, | ||||
|         output_len=args.output_len, | ||||
|         range_ratio=0.0, | ||||
|     ) | ||||
|  | ||||
|     # Run benchmark | ||||
|     result = await sequential_benchmark( | ||||
|         backend=backend, | ||||
|         api_url=api_url, | ||||
|         model_id=model_id, | ||||
|         tokenizer=tokenizer, | ||||
|         input_requests=input_requests, | ||||
|         request_func=request_func, | ||||
|         selected_percentiles=[50, 90, 95, 99], | ||||
|         cache_reset_url=cache_reset_url, | ||||
|     ) | ||||
|  | ||||
|     return result | ||||
|  | ||||
|  | ||||
| def main(args): | ||||
|     print(args) | ||||
|     random.seed(args.seed) | ||||
|     np.random.seed(args.seed) | ||||
|  | ||||
|     asyncio.run(main_async(args)) | ||||
|  | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|     parser = argparse.ArgumentParser(description="Sequential benchmark for LLM serving") | ||||
|     parser.add_argument( | ||||
|         "--backend", type=str, default="vllm", help="Backend to use for requests" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--base-url", | ||||
|         type=str, | ||||
|         default=None, | ||||
|         help="Server base URL (overrides --host and --port)", | ||||
|     ) | ||||
|     parser.add_argument("--host", type=str, default="127.0.0.1") | ||||
|     parser.add_argument("--port", type=int, default=8000) | ||||
|     parser.add_argument( | ||||
|         "--endpoint", type=str, default="/v1/completions", help="API endpoint" | ||||
|     ) | ||||
|     parser.add_argument("--model", type=str, required=True, help="Name of the model") | ||||
|     parser.add_argument( | ||||
|         "--tokenizer", type=str, help="Name of the tokenizer (defaults to model name)" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--num-requests", type=int, default=100, help="Number of requests to process" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--input-len", type=int, default=128, help="Input len for generated prompts" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--output-len", type=int, default=None, help="Override output len for requests" | ||||
|     ) | ||||
|     parser.add_argument("--seed", type=int, default=42) | ||||
|     parser.add_argument( | ||||
|         "--trust-remote-code", | ||||
|         action="store_true", | ||||
|         help="Trust remote code from HuggingFace", | ||||
|     ) | ||||
|  | ||||
|     args = parser.parse_args() | ||||
|     main(args) | ||||
							
								
								
									
										88
									
								
								tools/pd_disagg/Justfile
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										88
									
								
								tools/pd_disagg/Justfile
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,88 @@ | ||||
| # Needed for the proxy server | ||||
| vllm-directory := "/home/rshaw/vllm/"  | ||||
|  | ||||
| PREFILL_GPU := "0,1,2,3" | ||||
| DECODE_GPU := "4,5,6,7" | ||||
|  | ||||
| PREFILL_TP := env("PREFILL_TP", "1") | ||||
| DECODE_TP := env("DECODE_TP", "1") | ||||
|  | ||||
| BLOCK_SIZE := env("BLOCK_SIZE", "128") | ||||
|  | ||||
| MODEL := "meta-llama/Llama-3.1-8B-Instruct" | ||||
| PROXY_PORT := "8192" | ||||
| PREFILL_PORT := "8100" | ||||
| DECODE_PORT := "8200" | ||||
| PREFILL_NIXL_SIDE_CHANNEL_PORT := "5557" | ||||
| DECODE_NIXL_SIDE_CHANNEL_PORT := "5568" | ||||
|  | ||||
| prefill: | ||||
|     VLLM_NIXL_SIDE_CHANNEL_PORT={{PREFILL_NIXL_SIDE_CHANNEL_PORT}} \ | ||||
|     CUDA_VISIBLE_DEVICES={{PREFILL_GPU}} \ | ||||
|     vllm serve {{MODEL}} \ | ||||
|       --port {{PREFILL_PORT}} \ | ||||
|       --tensor-parallel-size {{PREFILL_TP}} \ | ||||
|       --enforce-eager \ | ||||
|       --disable-log-requests \ | ||||
|       --block-size {{BLOCK_SIZE}} \ | ||||
|       --kv-transfer-config '{"kv_connector":"NixlConnector","kv_role":"kv_both"}' | ||||
|  | ||||
| decode: | ||||
|     VLLM_NIXL_SIDE_CHANNEL_PORT={{DECODE_NIXL_SIDE_CHANNEL_PORT}} \ | ||||
|     CUDA_VISIBLE_DEVICES={{DECODE_GPU}} \ | ||||
|     vllm serve {{MODEL}} \ | ||||
|       --port {{DECODE_PORT}} \ | ||||
|       --tensor-parallel-size {{DECODE_TP}} \ | ||||
|       --enforce-eager \ | ||||
|       --disable-log-requests \ | ||||
|       --block-size {{BLOCK_SIZE}} \ | ||||
|       --kv-transfer-config '{"kv_connector":"NixlConnector","kv_role":"kv_both"}' | ||||
|  | ||||
| proxy: | ||||
|     python "{{vllm-directory}}tests/v1/kv_connector/nixl_integration/toy_proxy_server.py" \ | ||||
|       --port {{PROXY_PORT}} \ | ||||
|       --prefiller-port {{PREFILL_PORT}} \ | ||||
|       --decoder-port {{DECODE_PORT}} | ||||
|  | ||||
| send_request: | ||||
|   curl -X POST http://localhost:{{PROXY_PORT}}/v1/completions \ | ||||
|     -H "Content-Type: application/json" \ | ||||
|     -d '{ \ | ||||
|       "model": "{{MODEL}}", \ | ||||
|       "prompt": "Red Hat is the best open source company by far across Linux, K8s, and AI, and vLLM has the greatest community in open source AI software infrastructure. I love vLLM because", \ | ||||
|       "max_tokens": 150, \ | ||||
|       "temperature": 0.7 \ | ||||
|     }' | ||||
|  | ||||
| benchmark NUM_PROMPTS: | ||||
|   python {{vllm-directory}}/benchmarks/benchmark_serving.py \ | ||||
|     --port {{PROXY_PORT}} \ | ||||
|     --model {{MODEL}} \ | ||||
|     --dataset-name random \ | ||||
|     --random-input-len 30000 \ | ||||
|     --random-output-len 10 \ | ||||
|     --num-prompts {{NUM_PROMPTS}} \ | ||||
|     --seed $(date +%s) \ | ||||
|  | ||||
| benchmark_one INPUT_LEN: | ||||
|   python {{vllm-directory}}benchmarks/benchmark_one_concurrent.py \ | ||||
|     --port {{PROXY_PORT}} \ | ||||
|     --model {{MODEL}} \ | ||||
|     --input-len {{INPUT_LEN}} \ | ||||
|     --output-len 1 \ | ||||
|     --num-requests 10 \ | ||||
|     --seed $(date +%s) | ||||
|  | ||||
| benchmark_one_no_pd INPUT_LEN: | ||||
|   python {{vllm-directory}}benchmarks/benchmark_one_concurrent_req.py \ | ||||
|     --port {{DECODE_PORT}} \ | ||||
|     --model {{MODEL}} \ | ||||
|     --input-len {{INPUT_LEN}} \ | ||||
|     --output-len 1 \ | ||||
|     --num-requests 10 \ | ||||
|     --seed $(date +%s) | ||||
|  | ||||
| eval: | ||||
|   lm_eval --model local-completions --tasks gsm8k \ | ||||
|     --model_args model={{MODEL}},base_url=http://127.0.0.1:{{PROXY_PORT}}/v1/completions,num_concurrent=100,max_retries=3,tokenized_requests=False \ | ||||
|     --limit 1000 | ||||
		Reference in New Issue
	
	Block a user
	