From a5ecd94a3f2addbc4b79594db1e5e3b0f7fb101c Mon Sep 17 00:00:00 2001 From: Yuanyuan Chen Date: Wed, 10 Sep 2025 17:38:06 +0800 Subject: [PATCH] Enable ruff on benchmark and scripts (#40634) * Enable ruff on benchmark and scripts Signed-off-by: cyy * Cover benchmark_v2 Signed-off-by: Yuanyuan Chen * correct * style * style --------- Signed-off-by: cyy Signed-off-by: Yuanyuan Chen Co-authored-by: Cyril Vallez --- Makefile | 2 +- benchmark/benches/llama.py | 31 +- benchmark/benchmark.py | 2 - benchmark/benchmarks_entrypoint.py | 245 +++++----- benchmark/optimum_benchmark_wrapper.py | 6 +- benchmark_v2/benches/__init__.py | 2 +- benchmark_v2/benches/llama.py | 104 +++-- benchmark_v2/benchmark_framework.py | 597 ++++++++++++------------- benchmark_v2/run_benchmarks.py | 295 ++++++------ scripts/check_tokenizers.py | 20 +- scripts/stale.py | 6 +- 11 files changed, 661 insertions(+), 649 deletions(-) diff --git a/Makefile b/Makefile index 8a9470b9af5..58994409a06 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ # make sure to test the local checkout in scripts and not the pre-installed one (don't use quotes!) export PYTHONPATH = src -check_dirs := examples tests src utils +check_dirs := examples tests src utils scripts benchmark benchmark_v2 exclude_folders := "" diff --git a/benchmark/benches/llama.py b/benchmark/benches/llama.py index b2dcf370455..c5ecb17ebbb 100644 --- a/benchmark/benches/llama.py +++ b/benchmark/benches/llama.py @@ -11,25 +11,28 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from logging import Logger import os +import sys +from logging import Logger from threading import Event, Thread from time import perf_counter, sleep from typing import Optional -import sys + # Add the parent directory to Python path to import benchmarks_entrypoint sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from benchmarks_entrypoint import MetricsRecorder - import gpustat import psutil import psycopg2 +from benchmarks_entrypoint import MetricsRecorder + # Optional heavy ML dependencies - only required when actually running the benchmark try: import torch + from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig, StaticCache + TRANSFORMERS_AVAILABLE = True except ImportError: TRANSFORMERS_AVAILABLE = False @@ -63,7 +66,13 @@ def collect_metrics(benchmark_id, continue_metric_collection, metrics_recorder): def run_benchmark( - logger: Logger, repository: str, branch: str, commit_id: str, commit_msg: str, metrics_recorder=None, num_tokens_to_generate=100 + logger: Logger, + repository: str, + branch: str, + commit_id: str, + commit_msg: str, + metrics_recorder=None, + num_tokens_to_generate=100, ): # Check if required ML dependencies are available if not TRANSFORMERS_AVAILABLE: @@ -71,11 +80,11 @@ def run_benchmark( logger.error("pip install torch transformers") logger.error("Skipping LLaMA benchmark due to missing dependencies.") return - + continue_metric_collection = Event() metrics_thread = None model_id = "meta-llama/Llama-2-7b-hf" - + # If no metrics_recorder is provided, create one for backward compatibility if metrics_recorder is None: try: @@ -154,7 +163,7 @@ def run_benchmark( # First eager forward pass logger.info("running first eager forward pass") start = perf_counter() - outputs = model(**inputs) + _ = model(**inputs) torch.cuda.synchronize() end = perf_counter() first_eager_fwd_pass_time = end - start @@ -163,7 +172,7 @@ def run_benchmark( # Second eager forward pass (should be faster) logger.info("running second eager forward pass") start = perf_counter() - outputs = model(**inputs) + _ = model(**inputs) torch.cuda.synchronize() end = perf_counter() second_eager_fwd_pass_time = end - start @@ -339,7 +348,7 @@ def run_benchmark( continue_metric_collection.set() if metrics_thread is not None: metrics_thread.join() - + # Only close the recorder if we created it locally if should_close_recorder: - metrics_recorder.close() \ No newline at end of file + metrics_recorder.close() diff --git a/benchmark/benchmark.py b/benchmark/benchmark.py index 831ec424e3d..cef5b3138cf 100644 --- a/benchmark/benchmark.py +++ b/benchmark/benchmark.py @@ -31,9 +31,7 @@ from contextlib import contextmanager from pathlib import Path from git import Repo - from huggingface_hub import HfApi - from optimum_benchmark import Benchmark from optimum_benchmark_wrapper import main diff --git a/benchmark/benchmarks_entrypoint.py b/benchmark/benchmarks_entrypoint.py index 929fe64288d..8c581e9d75e 100644 --- a/benchmark/benchmarks_entrypoint.py +++ b/benchmark/benchmarks_entrypoint.py @@ -13,19 +13,20 @@ # limitations under the License. import argparse import importlib.util +import json import logging import os import sys -import json import uuid from datetime import datetime -from typing import Dict, Tuple, Optional, List import pandas as pd + try: from psycopg2.extensions import register_adapter from psycopg2.extras import Json + register_adapter(dict, Json) PSYCOPG2_AVAILABLE = True except ImportError: @@ -38,8 +39,14 @@ class ImportModuleException(Exception): class MetricsRecorder: def __init__( - self, connection, logger: logging.Logger, repository: str, branch: str, commit_id: str, commit_msg: str, - collect_csv_data: bool = True + self, + connection, + logger: logging.Logger, + repository: str, + branch: str, + commit_id: str, + commit_msg: str, + collect_csv_data: bool = True, ): self.conn = connection self.use_database = connection is not None @@ -51,27 +58,43 @@ class MetricsRecorder: self.commit_id = commit_id self.commit_msg = commit_msg self.collect_csv_data = collect_csv_data - + # For CSV export - store all data in pandas DataFrames (only if CSV collection is enabled) if self.collect_csv_data: # Initialize empty DataFrames with proper schemas - self.benchmarks_df = pd.DataFrame(columns=[ - 'benchmark_id', 'repository', 'branch', 'commit_id', 'commit_message', - 'metadata', 'created_at' - ]) - self.device_measurements_df = pd.DataFrame(columns=[ - 'benchmark_id', 'cpu_util', 'mem_megabytes', 'gpu_util', - 'gpu_mem_megabytes', 'time' - ]) - self.model_measurements_df = pd.DataFrame(columns=[ - 'benchmark_id', 'time', 'model_load_time', 'first_eager_forward_pass_time_secs', - 'second_eager_forward_pass_time_secs', 'first_eager_generate_time_secs', - 'second_eager_generate_time_secs', 'time_to_first_token_secs', - 'time_to_second_token_secs', 'time_to_third_token_secs', - 'time_to_next_token_mean_secs', 'first_compile_generate_time_secs', - 'second_compile_generate_time_secs', 'third_compile_generate_time_secs', - 'fourth_compile_generate_time_secs' - ]) + self.benchmarks_df = pd.DataFrame( + columns=[ + "benchmark_id", + "repository", + "branch", + "commit_id", + "commit_message", + "metadata", + "created_at", + ] + ) + self.device_measurements_df = pd.DataFrame( + columns=["benchmark_id", "cpu_util", "mem_megabytes", "gpu_util", "gpu_mem_megabytes", "time"] + ) + self.model_measurements_df = pd.DataFrame( + columns=[ + "benchmark_id", + "time", + "model_load_time", + "first_eager_forward_pass_time_secs", + "second_eager_forward_pass_time_secs", + "first_eager_generate_time_secs", + "second_eager_generate_time_secs", + "time_to_first_token_secs", + "time_to_second_token_secs", + "time_to_third_token_secs", + "time_to_next_token_mean_secs", + "first_compile_generate_time_secs", + "second_compile_generate_time_secs", + "third_compile_generate_time_secs", + "fourth_compile_generate_time_secs", + ] + ) else: self.benchmarks_df = None self.device_measurements_df = None @@ -83,7 +106,7 @@ class MetricsRecorder: """ # Generate a unique UUID for this benchmark benchmark_id = str(uuid.uuid4()) - + if self.use_database: with self.conn.cursor() as cur: cur.execute( @@ -91,28 +114,32 @@ class MetricsRecorder: (benchmark_id, self.repository, self.branch, self.commit_id, self.commit_msg, metadata), ) self.logger.debug(f"initialised benchmark #{benchmark_id}") - + # Store benchmark data for CSV export (if enabled) if self.collect_csv_data: # Add row to pandas DataFrame - new_row = pd.DataFrame([{ - 'benchmark_id': benchmark_id, - 'repository': self.repository, - 'branch': self.branch, - 'commit_id': self.commit_id, - 'commit_message': self.commit_msg, - 'metadata': json.dumps(metadata), - 'created_at': datetime.utcnow().isoformat() - }]) + new_row = pd.DataFrame( + [ + { + "benchmark_id": benchmark_id, + "repository": self.repository, + "branch": self.branch, + "commit_id": self.commit_id, + "commit_message": self.commit_msg, + "metadata": json.dumps(metadata), + "created_at": datetime.utcnow().isoformat(), + } + ] + ) self.benchmarks_df = pd.concat([self.benchmarks_df, new_row], ignore_index=True) - + mode_info = [] if self.use_database: mode_info.append("database") if self.collect_csv_data: mode_info.append("CSV") mode_str = " + ".join(mode_info) if mode_info else "no storage" - + self.logger.debug(f"initialised benchmark #{benchmark_id} ({mode_str} mode)") return benchmark_id @@ -123,16 +150,20 @@ class MetricsRecorder: # Store device measurements for CSV export (if enabled) if self.collect_csv_data: # Add row to pandas DataFrame - new_row = pd.DataFrame([{ - 'benchmark_id': benchmark_id, - 'cpu_util': cpu_util, - 'mem_megabytes': mem_megabytes, - 'gpu_util': gpu_util, - 'gpu_mem_megabytes': gpu_mem_megabytes, - 'time': datetime.utcnow().isoformat() - }]) + new_row = pd.DataFrame( + [ + { + "benchmark_id": benchmark_id, + "cpu_util": cpu_util, + "mem_megabytes": mem_megabytes, + "gpu_util": gpu_util, + "gpu_mem_megabytes": gpu_mem_megabytes, + "time": datetime.utcnow().isoformat(), + } + ] + ) self.device_measurements_df = pd.concat([self.device_measurements_df, new_row], ignore_index=True) - + # Store in database if available if self.use_database: with self.conn.cursor() as cur: @@ -140,7 +171,7 @@ class MetricsRecorder: "INSERT INTO device_measurements (benchmark_id, cpu_util, mem_megabytes, gpu_util, gpu_mem_megabytes) VALUES (%s, %s, %s, %s, %s)", (benchmark_id, cpu_util, mem_megabytes, gpu_util, gpu_mem_megabytes), ) - + self.logger.debug( f"collected device measurements for benchmark #{benchmark_id} [CPU util: {cpu_util}, mem MBs: {mem_megabytes}, GPU util: {gpu_util}, GPU mem MBs: {gpu_mem_megabytes}]" ) @@ -149,16 +180,13 @@ class MetricsRecorder: # Store model measurements for CSV export (if enabled) if self.collect_csv_data: # Add row to pandas DataFrame with flattened measurements - row_data = { - 'benchmark_id': benchmark_id, - 'time': datetime.utcnow().isoformat() - } + row_data = {"benchmark_id": benchmark_id, "time": datetime.utcnow().isoformat()} # Flatten the measurements dict into the row row_data.update(measurements) - + new_row = pd.DataFrame([row_data]) self.model_measurements_df = pd.concat([self.model_measurements_df, new_row], ignore_index=True) - + # Store in database if available if self.use_database: with self.conn.cursor() as cur: @@ -174,7 +202,7 @@ class MetricsRecorder: measurements, ), ) - + self.logger.debug(f"collected model measurements for benchmark #{benchmark_id}: {measurements}") def export_to_csv(self, output_dir: str = "benchmark_results"): @@ -184,19 +212,19 @@ class MetricsRecorder: if not self.collect_csv_data: self.logger.warning("CSV data collection is disabled - no CSV files will be generated") return - + if not os.path.exists(output_dir): os.makedirs(output_dir) self.logger.info(f"Created output directory: {output_dir}") - + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") files_created = [] - + # Export using pandas DataFrames self._export_pandas_data(output_dir, timestamp, files_created) - + self.logger.info(f"CSV export complete! Created {len(files_created)} files in {output_dir}") - + def _export_pandas_data(self, output_dir: str, timestamp: str, files_created: list): """ Export CSV files using pandas DataFrames @@ -206,24 +234,24 @@ class MetricsRecorder: self.benchmarks_df.to_csv(benchmarks_file, index=False) files_created.append(benchmarks_file) self.logger.info(f"Exported {len(self.benchmarks_df)} benchmark records to {benchmarks_file}") - - # Export device measurements + + # Export device measurements device_file = os.path.join(output_dir, f"device_measurements_{timestamp}.csv") self.device_measurements_df.to_csv(device_file, index=False) files_created.append(device_file) self.logger.info(f"Exported {len(self.device_measurements_df)} device measurement records to {device_file}") - + # Export model measurements (already flattened) model_file = os.path.join(output_dir, f"model_measurements_{timestamp}.csv") self.model_measurements_df.to_csv(model_file, index=False) files_created.append(model_file) self.logger.info(f"Exported {len(self.model_measurements_df)} model measurement records to {model_file}") - + # Create comprehensive summary using pandas operations summary_file = os.path.join(output_dir, f"benchmark_summary_{timestamp}.csv") self._create_summary(summary_file) files_created.append(summary_file) - + def _create_summary(self, summary_file: str): """ Create a comprehensive summary CSV using pandas operations @@ -234,36 +262,42 @@ class MetricsRecorder: summary_df.to_csv(summary_file, index=False) self.logger.info(f"Created empty benchmark summary at {summary_file}") return - + # Start with benchmarks as the base summary_df = self.benchmarks_df.copy() - + # Add model measurements (join on benchmark_id) if len(self.model_measurements_df) > 0: # Drop 'time' column from model measurements to avoid conflicts - model_df = self.model_measurements_df.drop(columns=['time'], errors='ignore') - summary_df = summary_df.merge(model_df, on='benchmark_id', how='left') - + model_df = self.model_measurements_df.drop(columns=["time"], errors="ignore") + summary_df = summary_df.merge(model_df, on="benchmark_id", how="left") + # Calculate device measurement aggregates using pandas groupby if len(self.device_measurements_df) > 0: - device_agg = self.device_measurements_df.groupby('benchmark_id').agg({ - 'cpu_util': ['mean', 'max', 'std', 'count'], - 'mem_megabytes': ['mean', 'max', 'std'], - 'gpu_util': ['mean', 'max', 'std'], - 'gpu_mem_megabytes': ['mean', 'max', 'std'] - }).round(3) - + device_agg = ( + self.device_measurements_df.groupby("benchmark_id") + .agg( + { + "cpu_util": ["mean", "max", "std", "count"], + "mem_megabytes": ["mean", "max", "std"], + "gpu_util": ["mean", "max", "std"], + "gpu_mem_megabytes": ["mean", "max", "std"], + } + ) + .round(3) + ) + # Flatten column names device_agg.columns = [f"{col[0]}_{col[1]}" for col in device_agg.columns] device_agg = device_agg.reset_index() - + # Rename count column to be more descriptive - if 'cpu_util_count' in device_agg.columns: - device_agg = device_agg.rename(columns={'cpu_util_count': 'device_measurement_count'}) - + if "cpu_util_count" in device_agg.columns: + device_agg = device_agg.rename(columns={"cpu_util_count": "device_measurement_count"}) + # Merge with summary - summary_df = summary_df.merge(device_agg, on='benchmark_id', how='left') - + summary_df = summary_df.merge(device_agg, on="benchmark_id", how="left") + # Export the comprehensive summary summary_df.to_csv(summary_file, index=False) self.logger.info(f"Created comprehensive benchmark summary with {len(summary_df)} records at {summary_file}") @@ -312,23 +346,18 @@ def parse_arguments() -> tuple[str, str, str, str, bool, str]: type=str, help="The commit message associated with the commit, truncated to 70 characters.", ) - - parser.add_argument( - "--csv", - action="store_true", - default=False, - help="Enable CSV output files generation." - ) - + + parser.add_argument("--csv", action="store_true", default=False, help="Enable CSV output files generation.") + parser.add_argument( "--csv-output-dir", type=str, default="benchmark_results", - help="Directory for CSV output files (default: benchmark_results)." + help="Directory for CSV output files (default: benchmark_results).", ) args = parser.parse_args() - + # CSV is disabled by default, only enabled when --csv is used generate_csv = args.csv @@ -353,9 +382,10 @@ def create_database_connection(): if not PSYCOPG2_AVAILABLE: logger.warning("psycopg2 not available - running in CSV-only mode") return None - + try: import psycopg2 + conn = psycopg2.connect("dbname=metrics") logger.info("Successfully connected to database") return conn @@ -364,27 +394,28 @@ def create_database_connection(): return None -def create_global_metrics_recorder(repository: str, branch: str, commit_id: str, commit_msg: str, - generate_csv: bool = False) -> MetricsRecorder: +def create_global_metrics_recorder( + repository: str, branch: str, commit_id: str, commit_msg: str, generate_csv: bool = False +) -> MetricsRecorder: """ Create a global metrics recorder that will be used across all benchmarks. """ connection = create_database_connection() recorder = MetricsRecorder(connection, logger, repository, branch, commit_id, commit_msg, generate_csv) - + # Log the storage mode storage_modes = [] if connection is not None: storage_modes.append("database") if generate_csv: storage_modes.append("CSV") - + if not storage_modes: logger.warning("Running benchmarks with NO data storage (no database connection, CSV disabled)") logger.warning("Use --csv flag to enable CSV output when database is unavailable") else: logger.info(f"Running benchmarks with: {' + '.join(storage_modes)} storage") - + return recorder @@ -393,16 +424,16 @@ if __name__ == "__main__": benches_folder_path = os.path.join(benchmarks_folder_path, "benches") repository, branch, commit_id, commit_msg, generate_csv, csv_output_dir = parse_arguments() - + # Create a global metrics recorder global_metrics_recorder = create_global_metrics_recorder(repository, branch, commit_id, commit_msg, generate_csv) - + successful_benchmarks = 0 failed_benchmarks = 0 - + # Automatically discover all benchmark modules in benches/ folder benchmark_modules = [] - + if os.path.exists(benches_folder_path): logger.debug(f"Scanning for benchmarks in: {benches_folder_path}") for entry in os.scandir(benches_folder_path): @@ -410,12 +441,12 @@ if __name__ == "__main__": continue if entry.name.startswith("__"): # Skip __init__.py, __pycache__, etc. continue - + # Check if the file has a run_benchmark function try: logger.debug(f"checking if benches/{entry.name} has run_benchmark function") module = import_from_path(entry.name.split(".")[0], entry.path) - if hasattr(module, 'run_benchmark'): + if hasattr(module, "run_benchmark"): benchmark_modules.append(entry.name) logger.debug(f"discovered benchmark: {entry.name}") else: @@ -436,16 +467,18 @@ if __name__ == "__main__": logger.debug(f"loading: {module_name}") module = import_from_path(module_name.split(".")[0], module_path) logger.info(f"running benchmarks in: {module_name}") - + # Check if the module has an updated run_benchmark function that accepts metrics_recorder try: # Try the new signature first module.run_benchmark(logger, repository, branch, commit_id, commit_msg, global_metrics_recorder) except TypeError: # Fall back to the old signature for backward compatibility - logger.warning(f"Module {module_name} using old run_benchmark signature - database connection will be created per module") + logger.warning( + f"Module {module_name} using old run_benchmark signature - database connection will be created per module" + ) module.run_benchmark(logger, repository, branch, commit_id, commit_msg) - + successful_benchmarks += 1 except ImportModuleException as e: logger.error(e) @@ -461,7 +494,7 @@ if __name__ == "__main__": logger.info(f"CSV reports have been generated and saved to the {csv_output_dir} directory") else: logger.info("CSV generation disabled - no CSV files created (use --csv to enable)") - + logger.info(f"Benchmark run completed. Successful: {successful_benchmarks}, Failed: {failed_benchmarks}") except Exception as e: logger.error(f"Failed to export CSV results: {e}") diff --git a/benchmark/optimum_benchmark_wrapper.py b/benchmark/optimum_benchmark_wrapper.py index c43e9a73e31..afa6804bf06 100644 --- a/benchmark/optimum_benchmark_wrapper.py +++ b/benchmark/optimum_benchmark_wrapper.py @@ -3,7 +3,11 @@ import subprocess def main(config_dir, config_name, args): - subprocess.run(["optimum-benchmark", "--config-dir", f"{config_dir}", "--config-name", f"{config_name}"] + ["hydra/job_logging=disabled", "hydra/hydra_logging=disabled"] + args) + subprocess.run( + ["optimum-benchmark", "--config-dir", f"{config_dir}", "--config-name", f"{config_name}"] + + ["hydra/job_logging=disabled", "hydra/hydra_logging=disabled"] + + args + ) if __name__ == "__main__": diff --git a/benchmark_v2/benches/__init__.py b/benchmark_v2/benches/__init__.py index 6e70a5add84..64b106a0037 100644 --- a/benchmark_v2/benches/__init__.py +++ b/benchmark_v2/benches/__init__.py @@ -1 +1 @@ -# Benchmark implementations directory \ No newline at end of file +# Benchmark implementations directory diff --git a/benchmark_v2/benches/llama.py b/benchmark_v2/benches/llama.py index 7075f5834c0..23427a8549c 100644 --- a/benchmark_v2/benches/llama.py +++ b/benchmark_v2/benches/llama.py @@ -12,55 +12,63 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os import logging -from typing import Dict, Any, List - -from benchmark_framework import ModelBenchmark +import os +from typing import Any import torch +from benchmark_framework import ModelBenchmark + os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" os.environ["TOKENIZERS_PARALLELISM"] = "1" torch.set_float32_matmul_precision("high") + class LLaMABenchmark(ModelBenchmark): """Simplified LLaMA model benchmark implementation using the ModelBenchmark base class.""" - + def __init__(self, logger: logging.Logger): super().__init__(logger) self._default_prompt = "Why dogs are so cute?" # Custom prompt for LLaMA - - - def get_scenario_configs(self) -> List[Dict[str, Any]]: + def get_scenario_configs(self) -> list[dict[str, Any]]: """ Get LLaMA-specific scenario configurations. - + Returns: List of scenario configuration dictionaries """ return [ # Eager variants {"variant": "eager", "compile_mode": None, "use_cache": True, "description": "Eager execution with cache"}, - # Compiled variants - {"variant": "compiled", "compile_mode": "max-autotune", "use_cache": True, "description": "Compiled with max autotune"}, - + { + "variant": "compiled", + "compile_mode": "max-autotune", + "use_cache": True, + "description": "Compiled with max autotune", + }, # Kernelized variant (if available) - {"variant": "kernelized", "compile_mode": "max-autotune", "use_cache": True, "description": "Kernelized execution"}, + { + "variant": "kernelized", + "compile_mode": "max-autotune", + "use_cache": True, + "description": "Kernelized execution", + }, ] - + def _is_kernelization_available(self) -> bool: """Check if kernelization is available for LLaMA.""" try: - from kernels import Mode, kernelize + from kernels import Mode, kernelize # noqa: F401 + return True except ImportError: self.logger.debug("Kernelization not available: kernels module not found") return False - - def get_default_generation_config(self) -> Dict[str, Any]: + + def get_default_generation_config(self) -> dict[str, Any]: """Get LLaMA-specific generation configuration.""" return { "do_sample": False, @@ -69,20 +77,19 @@ class LLaMABenchmark(ModelBenchmark): "repetition_penalty": 1.0, "max_new_tokens": None, # Will be set per scenario } - - def get_model_init_kwargs(self, config) -> Dict[str, Any]: + + def get_model_init_kwargs(self, config) -> dict[str, Any]: """Get LLaMA-specific model initialization kwargs.""" - from benchmark_framework import BenchmarkConfig return { "torch_dtype": getattr(torch, config.torch_dtype), "attn_implementation": config.attn_implementation, "use_cache": True, } - + def get_default_torch_dtype(self) -> str: """Get default torch dtype for LLaMA.""" return "float16" # LLaMA works well with float16 - + def get_default_device(self) -> str: """Get default device for LLaMA.""" return "cuda" # LLaMA prefers CUDA @@ -91,35 +98,37 @@ class LLaMABenchmark(ModelBenchmark): def run_llama(logger, output_dir, **kwargs): """ Run LLaMA benchmark with the given configuration. - + Args: logger: Logger instance output_dir: Output directory for results **kwargs: Additional configuration options - + Returns: Path to output file if successful """ from benchmark_framework import BenchmarkRunner - + # Extract parameters with defaults - model_id = kwargs.get('model_id', 'meta-llama/Llama-2-7b-hf') - warmup_iterations = kwargs.get('warmup_iterations', 3) - measurement_iterations = kwargs.get('measurement_iterations', 5) - num_tokens_to_generate = kwargs.get('num_tokens_to_generate', 100) - include_sdpa_variants = kwargs.get('include_sdpa_variants', True) - device = kwargs.get('device', 'cuda') - torch_dtype = kwargs.get('torch_dtype', 'float16') - batch_size = kwargs.get('batch_size', 1) - commit_id = kwargs.get('commit_id', None) - + model_id = kwargs.get("model_id", "meta-llama/Llama-2-7b-hf") + warmup_iterations = kwargs.get("warmup_iterations", 3) + measurement_iterations = kwargs.get("measurement_iterations", 5) + num_tokens_to_generate = kwargs.get("num_tokens_to_generate", 100) + include_sdpa_variants = kwargs.get("include_sdpa_variants", True) + device = kwargs.get("device", "cuda") + torch_dtype = kwargs.get("torch_dtype", "float16") + batch_size = kwargs.get("batch_size", 1) + commit_id = kwargs.get("commit_id") + logger.info(f"Starting LLaMA benchmark for model: {model_id}") - logger.info(f"Configuration: warmup={warmup_iterations}, measurement={measurement_iterations}, tokens={num_tokens_to_generate}") - + logger.info( + f"Configuration: warmup={warmup_iterations}, measurement={measurement_iterations}, tokens={num_tokens_to_generate}" + ) + try: # Create benchmark instance benchmark = LLaMABenchmark(logger) - + # Create scenarios scenarios = benchmark.create_scenarios( model_id=model_id, @@ -129,28 +138,29 @@ def run_llama(logger, output_dir, **kwargs): include_sdpa_variants=include_sdpa_variants, device=device, torch_dtype=torch_dtype, - batch_size=batch_size + batch_size=batch_size, ) - + logger.info(f"Created {len(scenarios)} benchmark scenarios") - + # Create runner and execute benchmarks runner = BenchmarkRunner(logger, output_dir) results = runner.run_benchmark(benchmark, scenarios, commit_id=commit_id) - + if not results: logger.warning("No successful benchmark results") return None - + # Save results - model_name = model_id.split('/')[-1] # Extract model name from ID + model_name = model_id.split("/")[-1] # Extract model name from ID output_file = runner.save_results(model_name, results) - + logger.info(f"LLaMA benchmark completed successfully. Results saved to: {output_file}") return output_file - + except Exception as e: logger.error(f"LLaMA benchmark failed: {e}") import traceback + logger.debug(traceback.format_exc()) - raise \ No newline at end of file + raise diff --git a/benchmark_v2/benchmark_framework.py b/benchmark_v2/benchmark_framework.py index f152c28c15f..3e4005b9f4b 100644 --- a/benchmark_v2/benchmark_framework.py +++ b/benchmark_v2/benchmark_framework.py @@ -14,28 +14,26 @@ import gc import json -import os -import subprocess -import sys -import time -import statistics -import threading -from abc import ABC, abstractmethod -from contextlib import nullcontext -from dataclasses import dataclass, field, asdict -from datetime import datetime -from typing import Any, Callable, Dict, List, Optional, Union, TypedDict import logging +import os +import statistics +import sys +import threading +import time +from abc import ABC, abstractmethod +from dataclasses import asdict, dataclass, field +from datetime import datetime +from typing import Any, Optional, TypedDict, Union +import gpustat import numpy as np import psutil -import gpustat - import torch class GPUMetrics(TypedDict): """GPU monitoring result with GPU metrics.""" + gpu_utilization_mean: float gpu_utilization_max: float gpu_utilization_min: float @@ -48,30 +46,31 @@ class GPUMetrics(TypedDict): class NoGPU(TypedDict): """GPU monitoring result without GPU metrics.""" + gpu_monitoring_status: str gpu_monitoring_reason: str class ArchAwareTimer: """Architecture-aware timer for supposedly better prescision""" - + def __init__(self, device: Optional[str] = None): """ Initialize architecture-aware timer. - + Args: device: Device to use. If None, uses current device. """ self.device = device self.use_cuda = torch.cuda.is_available() - + if self.use_cuda: if device and device != "cpu": self.device_obj = torch.device(device) else: # Fall back to CPU timing if device is CPU or CUDA not available self.use_cuda = False - + if self.use_cuda: try: # Create CUDA events for timing @@ -80,11 +79,11 @@ class ArchAwareTimer: except RuntimeError: # Fall back to CPU timing if CUDA events fail self.use_cuda = False - + if not self.use_cuda: self.start_time = None self.end_time = None - + def start(self): """Start timing.""" if self.use_cuda: @@ -92,7 +91,7 @@ class ArchAwareTimer: self.start_event.record(stream=torch.cuda.current_stream(self.device_obj)) else: self.start_time = time.perf_counter() - + def stop(self): """Stop timing.""" if self.use_cuda: @@ -100,11 +99,11 @@ class ArchAwareTimer: torch.cuda.synchronize(self.device_obj) else: self.end_time = time.perf_counter() - + def elapsed_time(self) -> float: """ Get elapsed time in seconds. - + Returns: Elapsed time in seconds """ @@ -115,17 +114,17 @@ class ArchAwareTimer: if self.start_time is None or self.end_time is None: raise RuntimeError("Timer not properly started/stopped") return self.end_time - self.start_time - + @property def timing_method(self) -> str: """Get the timing method being used.""" return "CUDA Events" if self.use_cuda else "CPU perf_counter" - + def __enter__(self): """Context manager entry.""" self.start() return self - + def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" self.stop() @@ -134,6 +133,7 @@ class ArchAwareTimer: @dataclass class BenchmarkConfig: """Configuration for a single benchmark scenario.""" + name: str model_id: str variant: str = "eager" # "eager", "compiled", "kernelized" @@ -143,13 +143,13 @@ class BenchmarkConfig: device: str = "cuda" torch_dtype: str = "float16" compile_mode: Optional[str] = None # None, "default", "reduce-overhead", "max-autotune" - compile_options: Dict[str, Any] = field(default_factory=dict) + compile_options: dict[str, Any] = field(default_factory=dict) use_cache: bool = True batch_size: int = 1 sequence_length: Optional[int] = None attn_implementation: str = "sdpa" # "eager", "sdpa", "flash_attention_2" sdpa_backend: Optional[str] = None # None, "math", "flash_attention", "efficient_attention", "cudnn_attention" - custom_params: Dict[str, Any] = field(default_factory=dict) + custom_params: dict[str, Any] = field(default_factory=dict) class BenchmarkScenario: @@ -157,22 +157,22 @@ class BenchmarkScenario: A benchmark scenario that encapsulates both configuration and setup logic. This makes it easier to define and adapt benchmarks for different models. """ - + def __init__(self, name: str, config: BenchmarkConfig, description: str = ""): self.name = name self.config = config self.description = description self._setup_callbacks = [] self._teardown_callbacks = [] - + def add_setup_callback(self, callback: callable): """Add a callback to be executed during scenario setup.""" self._setup_callbacks.append(callback) - + def add_teardown_callback(self, callback: callable): """Add a callback to be executed during scenario teardown.""" self._teardown_callbacks.append(callback) - + def setup(self, model, tokenizer, logger=None): """Execute setup callbacks for this scenario.""" for callback in self._setup_callbacks: @@ -181,7 +181,7 @@ class BenchmarkScenario: except Exception as e: if logger: logger.warning(f"Setup callback failed for scenario {self.name}: {e}") - + def teardown(self, model, tokenizer, logger=None): """Execute teardown callbacks for this scenario.""" for callback in self._teardown_callbacks: @@ -190,29 +190,29 @@ class BenchmarkScenario: except Exception as e: if logger: logger.warning(f"Teardown callback failed for scenario {self.name}: {e}") - + def __repr__(self): return f"BenchmarkScenario(name='{self.name}', variant='{self.config.variant}')" - - @dataclass class TimingResult: """Result from a timing measurement.""" + time_to_first_token_seconds: Optional[float] = None latency_seconds: float = 0.0 tokens_per_second: Optional[float] = None time_per_output_token_seconds: Optional[float] = None total_tokens_generated: int = 0 - metadata: Dict[str, Any] = field(default_factory=dict) + metadata: dict[str, Any] = field(default_factory=dict) @dataclass class BenchmarkStatistics: """Statistical analysis of benchmark measurements.""" + name: str - measurements: List[float] + measurements: list[float] mean: float median: float std: float @@ -226,13 +226,13 @@ class BenchmarkStatistics: unit: str = "seconds" @classmethod - def from_measurements(cls, name: str, measurements: List[float], unit: str = "seconds") -> 'BenchmarkStatistics': + def from_measurements(cls, name: str, measurements: list[float], unit: str = "seconds") -> "BenchmarkStatistics": """Create statistics from a list of measurements.""" if not measurements: raise ValueError("Cannot create statistics from empty measurements") - + measurements_array = np.array(measurements) - + return cls( name=name, measurements=measurements, @@ -246,13 +246,14 @@ class BenchmarkStatistics: p90=float(np.percentile(measurements_array, 90)), p95=float(np.percentile(measurements_array, 95)), p99=float(np.percentile(measurements_array, 99)), - unit=unit + unit=unit, ) -@dataclass +@dataclass class HardwareInfo: """Hardware information collected during benchmarking.""" + gpu_name: str gpu_memory_total_mb: int cpu_count: int @@ -265,6 +266,7 @@ class HardwareInfo: @dataclass class BenchmarkMetadata: """Metadata collected for each benchmark run.""" + timestamp: str commit_id: str hardware_info: HardwareInfo @@ -273,8 +275,8 @@ class BenchmarkMetadata: class GPUMonitor: """Monitor GPU utilization during benchmark execution.""" - - def __init__(self, sample_interval: float = 0.1, logger: logging.Logger = None): + + def __init__(self, sample_interval: float = 0.1, logger: Optional[logging.Logger] = None): self.sample_interval = sample_interval self.logger = logger or logging.getLogger(__name__) self.stop_event = threading.Event() @@ -284,10 +286,10 @@ class GPUMonitor: self.timestamps = [] self.gpu_available = False self.warning_logged = False - + # Test GPU availability on initialization self._test_gpu_availability() - + def _test_gpu_availability(self): """Test if GPU monitoring is available.""" try: @@ -301,13 +303,13 @@ class GPUMonitor: except Exception as e: self.gpu_available = False self.logger.debug(f"GPU monitoring not available: {e}") - + def start(self): """Start monitoring GPU metrics.""" if not self.gpu_available: self.logger.debug("GPU monitoring disabled: no GPUs available") return - + # Clear the stop event to enable monitoring self.stop_event.clear() self.gpu_utilization = [] @@ -317,20 +319,17 @@ class GPUMonitor: self.thread = threading.Thread(target=self._monitor_loop) self.thread.start() self.logger.debug("GPU monitoring started") - + def stop_and_collect(self) -> Union[GPUMetrics, NoGPU]: """Stop monitoring and return collected metrics.""" if not self.gpu_available: - return NoGPU( - gpu_monitoring_status="disabled", - gpu_monitoring_reason="no_gpus_available" - ) - + return NoGPU(gpu_monitoring_status="disabled", gpu_monitoring_reason="no_gpus_available") + # Signal the monitoring thread to stop self.stop_event.set() if self.thread: self.thread.join() - + if self.gpu_utilization: metrics = GPUMetrics( gpu_utilization_mean=statistics.mean(self.gpu_utilization), @@ -340,21 +339,18 @@ class GPUMonitor: gpu_memory_used_max=max(self.gpu_memory_used), gpu_memory_used_min=min(self.gpu_memory_used), sample_count=len(self.gpu_utilization), - gpu_monitoring_status="success" + gpu_monitoring_status="success", ) self.logger.debug(f"GPU monitoring completed: {len(self.gpu_utilization)} samples collected") return metrics else: - return NoGPU( - gpu_monitoring_status="failed", - gpu_monitoring_reason="no_samples_collected" - ) - + return NoGPU(gpu_monitoring_status="failed", gpu_monitoring_reason="no_samples_collected") + def _monitor_loop(self): """Background monitoring loop using threading.Event for communication.""" consecutive_failures = 0 max_consecutive_failures = 5 - + # Continue monitoring until stop_event is set while not self.stop_event.is_set(): try: @@ -370,13 +366,13 @@ class GPUMonitor: if consecutive_failures >= max_consecutive_failures and not self.warning_logged: self.logger.warning("GPU monitoring: No GPU data returned by gpustat") self.warning_logged = True - + except Exception as e: consecutive_failures += 1 if consecutive_failures >= max_consecutive_failures and not self.warning_logged: self.logger.warning(f"GPU monitoring failed after {max_consecutive_failures} attempts: {e}") self.warning_logged = True - + # Use Event.wait() with timeout instead of time.sleep() # This allows for immediate response to stop signal while still maintaining sample interval if self.stop_event.wait(timeout=self.sample_interval): @@ -388,7 +384,7 @@ def get_hardware_info() -> HardwareInfo: """Collect hardware information.""" gpu_name = "unknown" gpu_memory_total = 0 - + try: gpu_stats = gpustat.GPUStatCollection.new_query() if gpu_stats and len(gpu_stats) > 0: @@ -397,12 +393,12 @@ def get_hardware_info() -> HardwareInfo: gpu_memory_total = gpu["memory.total"] except Exception: pass - + torch_version = torch.__version__ cuda_version = None - if hasattr(torch, 'cuda') and torch.cuda.is_available(): + if hasattr(torch, "cuda") and torch.cuda.is_available(): cuda_version = torch.version.cuda - + return HardwareInfo( gpu_name=gpu_name, gpu_memory_total_mb=gpu_memory_total, @@ -410,14 +406,14 @@ def get_hardware_info() -> HardwareInfo: memory_total_mb=int(psutil.virtual_memory().total / (1024 * 1024)), python_version=f"{sys.version.split()[0]}", torch_version=torch_version, - cuda_version=cuda_version + cuda_version=cuda_version, ) def flush_memory(): """Flush GPU memory and run garbage collection.""" gc.collect() - if hasattr(torch, 'cuda') and torch.cuda.is_available(): + if hasattr(torch, "cuda") and torch.cuda.is_available(): torch.cuda.empty_cache() torch.cuda.reset_max_memory_allocated() torch.cuda.reset_peak_memory_stats() @@ -428,7 +424,7 @@ def get_sdpa_backend(backend_name: Optional[str]): """Get the SDPA backend enum from string name.""" if backend_name is None: return None - + try: backend_map = { "math": torch.nn.attention.SDPBackend.MATH, @@ -442,18 +438,15 @@ def get_sdpa_backend(backend_name: Optional[str]): return None - - - class SDPAContext: """Context manager for SDPA kernel selection.""" - - def __init__(self, backend_name: Optional[str], logger: logging.Logger = None): + + def __init__(self, backend_name: Optional[str], logger: Optional[logging.Logger] = None): self.backend_name = backend_name self.logger = logger or logging.getLogger(__name__) self.backend = get_sdpa_backend(backend_name) if backend_name else None self.context = None - + def __enter__(self): if self.backend is not None: try: @@ -466,9 +459,11 @@ class SDPAContext: self.logger.warning(f"Failed to set SDPA backend {self.backend_name}: {e}") self.context = None elif self.backend_name and self.logger: - self.logger.debug(f"SDPA backend '{self.backend_name}' requested but not using kernel context (backend={self.backend})") + self.logger.debug( + f"SDPA backend '{self.backend_name}' requested but not using kernel context (backend={self.backend})" + ) return self - + def __exit__(self, exc_type, exc_val, exc_tb): if self.context is not None: try: @@ -481,44 +476,44 @@ class SDPAContext: class AbstractModelBenchmark(ABC): """Abstract base class for model benchmarks.""" - + def __init__(self, logger: logging.Logger): self.logger = logger self.model = None self.tokenizer = None self.device = None self.scenarios = {} # Map of scenario_name -> BenchmarkScenario - + @abstractmethod - def create_scenarios(self, **kwargs) -> Dict[str, 'BenchmarkScenario']: + def create_scenarios(self, **kwargs) -> dict[str, "BenchmarkScenario"]: """Create and return a dictionary of benchmark scenarios.""" pass - + @abstractmethod def setup_model(self, config: BenchmarkConfig) -> None: """Setup the model for benchmarking with the given configuration.""" pass - + @abstractmethod def cleanup_model(self) -> None: """Cleanup model resources.""" pass - + @abstractmethod def measure_time_to_first_token(self, config: BenchmarkConfig) -> float: """Measure time to first token generation.""" pass - + @abstractmethod def measure_latency(self, config: BenchmarkConfig) -> TimingResult: """Measure full generation latency and compute tokens/sec.""" pass - + def prepare_inputs(self, config: BenchmarkConfig) -> Any: """Prepare inputs for the model. Override if needed.""" return None - - def get_scenarios(self, **kwargs) -> Dict[str, 'BenchmarkScenario']: + + def get_scenarios(self, **kwargs) -> dict[str, "BenchmarkScenario"]: """Get benchmark scenarios. Creates them if they don't exist.""" if not self.scenarios: self.scenarios = self.create_scenarios(**kwargs) @@ -528,12 +523,12 @@ class AbstractModelBenchmark(ABC): class ModelBenchmark(AbstractModelBenchmark): """ Base class for HuggingFace Transformers model benchmarks. - + This class provides common scenario creation logic and handles the standard patterns for eager, compiled, and kernelized execution variants with different attention implementations and SDPA backends. """ - + def __init__(self, logger: logging.Logger): super().__init__(logger) self.inputs = None @@ -541,120 +536,122 @@ class ModelBenchmark(AbstractModelBenchmark): self.past_key_values = None self.config = None self._default_prompt = "Why dogs are so cute?" - + @property def default_prompt(self) -> str: """Default prompt for text generation. Override in subclasses if needed.""" return self._default_prompt - - - def get_attention_configs(self, include_sdpa_variants: bool = True) -> List[Dict[str, Any]]: + def get_attention_configs(self, include_sdpa_variants: bool = True) -> list[dict[str, Any]]: """ Get attention implementation configurations. - + Args: include_sdpa_variants: Whether to include SDPA backend variants - + Returns: List of attention configuration dictionaries """ attention_configs = [ {"attn_implementation": "eager", "sdpa_backends": [None], "desc_suffix": " with eager attention"}, ] - + # Add SDPA variants if requested if include_sdpa_variants: - attention_configs.append({ - "attn_implementation": "sdpa", - "sdpa_backends": [None, "math", "flash_attention", "efficient_attention"], - "desc_suffix": "" - }) - + attention_configs.append( + { + "attn_implementation": "sdpa", + "sdpa_backends": [None, "math", "flash_attention", "efficient_attention"], + "desc_suffix": "", + } + ) + return attention_configs - - def get_scenario_configs(self) -> List[Dict[str, Any]]: + + def get_scenario_configs(self) -> list[dict[str, Any]]: """ Get base scenario configurations. Override in subclasses to customize. - + Returns: List of scenario configuration dictionaries """ return [ # Eager variants {"variant": "eager", "compile_mode": None, "use_cache": True, "description": "Eager execution with cache"}, - # Compiled variants - {"variant": "compiled", "compile_mode": "max-autotune", "use_cache": True, "description": "Compiled with max autotune"}, - + { + "variant": "compiled", + "compile_mode": "max-autotune", + "use_cache": True, + "description": "Compiled with max autotune", + }, # Kernelized variant (if available) - {"variant": "kernelized", "compile_mode": "max-autotune", "use_cache": True, "description": "Kernelized execution"}, + { + "variant": "kernelized", + "compile_mode": "max-autotune", + "use_cache": True, + "description": "Kernelized execution", + }, ] - + def _is_kernelization_available(self) -> bool: """Check if kernelization is available. Override in subclasses.""" try: - from kernels import Mode, kernelize + from kernels import Mode, kernelize # noqa: F401 + return True except ImportError: return False - - def get_default_generation_config(self) -> Dict[str, Any]: + + def get_default_generation_config(self) -> dict[str, Any]: """Get default generation configuration. Override in subclasses for model-specific defaults.""" - return { - "do_sample": False, - "top_p": 1.0, - "temperature": 1.0 - } - - def get_model_init_kwargs(self, config: BenchmarkConfig) -> Dict[str, Any]: + return {"do_sample": False, "top_p": 1.0, "temperature": 1.0} + + def get_model_init_kwargs(self, config: BenchmarkConfig) -> dict[str, Any]: """Get model initialization kwargs. Override in subclasses for model-specific parameters.""" - return { - "torch_dtype": getattr(torch, config.torch_dtype), - "attn_implementation": config.attn_implementation - } - + return {"torch_dtype": getattr(torch, config.torch_dtype), "attn_implementation": config.attn_implementation} + def get_default_torch_dtype(self) -> str: """Get default torch dtype. Override in subclasses.""" return "float16" - + def get_default_device(self) -> str: """Get default device. Override in subclasses.""" return "cuda" - - def create_scenarios(self, **kwargs) -> Dict[str, 'BenchmarkScenario']: + + def create_scenarios(self, **kwargs) -> dict[str, "BenchmarkScenario"]: """Create benchmark scenarios for HuggingFace models.""" scenarios = {} - + # Extract parameters with model-specific defaults - model_id = kwargs.get('model_id', 'microsoft/DialoGPT-medium') - warmup_iterations = kwargs.get('warmup_iterations', 3) - measurement_iterations = kwargs.get('measurement_iterations', 5) - num_tokens_to_generate = kwargs.get('num_tokens_to_generate', 100) - include_sdpa_variants = kwargs.get('include_sdpa_variants', True) - device = kwargs.get('device', self.get_default_device()) - torch_dtype = kwargs.get('torch_dtype', self.get_default_torch_dtype()) - batch_size = kwargs.get('batch_size', 1) - + model_id = kwargs.get("model_id", "microsoft/DialoGPT-medium") + warmup_iterations = kwargs.get("warmup_iterations", 3) + measurement_iterations = kwargs.get("measurement_iterations", 5) + num_tokens_to_generate = kwargs.get("num_tokens_to_generate", 100) + include_sdpa_variants = kwargs.get("include_sdpa_variants", True) + device = kwargs.get("device", self.get_default_device()) + torch_dtype = kwargs.get("torch_dtype", self.get_default_torch_dtype()) + batch_size = kwargs.get("batch_size", 1) + # Get configurations attention_configs = self.get_attention_configs(include_sdpa_variants) scenario_configs = self.get_scenario_configs() - + # Create scenarios for each attention config and variant combination for attn_config in attention_configs: attn_implementation = attn_config["attn_implementation"] sdpa_backends = attn_config["sdpa_backends"] desc_suffix = attn_config["desc_suffix"] - + for scenario_config in scenario_configs: for sdpa_backend in sdpa_backends: # Skip kernelized if not available if scenario_config["variant"] == "kernelized" and not self._is_kernelization_available(): continue - + # Create unique config for this scenario config = BenchmarkConfig( - name=scenario_config['variant'], + name=scenario_config["variant"], model_id=model_id, variant=scenario_config["variant"], compile_mode=scenario_config["compile_mode"], @@ -666,14 +663,14 @@ class ModelBenchmark(AbstractModelBenchmark): torch_dtype=torch_dtype, batch_size=batch_size, attn_implementation=attn_implementation, - sdpa_backend=sdpa_backend if attn_implementation == "sdpa" else None + sdpa_backend=sdpa_backend if attn_implementation == "sdpa" else None, ) - + # Create scenario name scenario_name_parts = [scenario_config["variant"]] if scenario_config["compile_mode"]: scenario_name_parts.append(f"compile_{scenario_config['compile_mode']}") - + # Add attention implementation to name if attn_implementation == "eager": scenario_name_parts.append("eager_attn") @@ -682,9 +679,9 @@ class ModelBenchmark(AbstractModelBenchmark): scenario_name_parts.append(f"sdpa_{sdpa_backend}") else: scenario_name_parts.append("sdpa_default") - + scenario_name = "_".join(scenario_name_parts) - + # Create description description = scenario_config["description"] if attn_implementation == "sdpa" and sdpa_backend: @@ -693,217 +690,202 @@ class ModelBenchmark(AbstractModelBenchmark): description += " with SDPA default backend" else: description += desc_suffix - + # Create scenario - scenario = BenchmarkScenario( - name=scenario_name, - config=config, - description=description - ) - + scenario = BenchmarkScenario(name=scenario_name, config=config, description=description) + # Add setup callbacks based on variant if scenario_config["variant"] == "compiled": scenario.add_setup_callback(self._setup_compilation_callback) elif scenario_config["variant"] == "kernelized": scenario.add_setup_callback(self._setup_kernelization_callback) - + scenarios[scenario_name] = scenario - + return scenarios - + def _setup_compilation_callback(self, model, tokenizer, config, logger): """Setup callback for compilation scenarios.""" if logger: logger.info(f"Setting up compilation with mode: {config.compile_mode}") - + # Perform torch.compile if config.compile_mode is not None: - self.compiled_model = torch.compile( - model, - mode=config.compile_mode, - **config.compile_options - ) + self.compiled_model = torch.compile(model, mode=config.compile_mode, **config.compile_options) else: self.compiled_model = torch.compile(model, **config.compile_options) - + # Setup static cache for compiled mode if needed - if config.use_cache and hasattr(self, 'inputs') and self.inputs is not None: + if config.use_cache and hasattr(self, "inputs") and self.inputs is not None: self._setup_static_cache(config) - + def _setup_kernelization_callback(self, model, tokenizer, config, logger): - """Setup callback for kernelization scenarios.""" + """Setup callback for kernelization scenarios.""" if logger: logger.info("Setting up kernelization") - + try: from kernels import Mode, kernelize - self.compiled_model = kernelize( - model, - mode=Mode.INFERENCE - ) + + self.compiled_model = kernelize(model, mode=Mode.INFERENCE) except Exception as e: if logger: logger.warning(f"Failed to setup kernelized mode: {e}") logger.warning("Falling back to eager mode") config.variant = "eager" - + def _setup_static_cache(self, config: BenchmarkConfig): """Setup static cache for compiled models. Override if needed.""" - if hasattr(self, 'inputs') and self.inputs is not None: + if hasattr(self, "inputs") and self.inputs is not None: try: from transformers import StaticCache + seq_length = self.inputs["input_ids"].shape[1] - + # Get the actual device the model is on - if hasattr(self.model, 'device'): + if hasattr(self.model, "device"): cache_device = self.model.device else: cache_device = self.device - + self.past_key_values = StaticCache( config=self.model.config, max_batch_size=config.batch_size, max_cache_len=seq_length + config.num_tokens_to_generate, device=cache_device, - dtype=getattr(torch, config.torch_dtype) + dtype=getattr(torch, config.torch_dtype), ) self.logger.debug(f"StaticCache created on device: {cache_device}") except (ImportError, TypeError) as e: # StaticCache not available or incompatible, continue without it self.logger.debug(f"StaticCache setup failed: {e}, continuing without cache") self.past_key_values = None - + def setup_model(self, config: BenchmarkConfig) -> None: """Setup the HuggingFace model for benchmarking with the given configuration.""" - + self.logger.info(f"Setting up model: {config.model_id} with variant: {config.variant}") self.device = config.device self.config = config - + # Load model and tokenizer self._load_model_and_tokenizer(config) - + # Prepare inputs self._prepare_model_inputs(config) - + # Configure generation settings self._configure_generation(config) - + self.logger.info("Model setup complete") - + def _load_model_and_tokenizer(self, config: BenchmarkConfig): """Load the model and tokenizer. Override in subclasses for custom loading.""" - from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig - + # Load tokenizer self.tokenizer = AutoTokenizer.from_pretrained(config.model_id) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token - + # Prepare generation config generation_config_dict = self.get_default_generation_config() gen_config = GenerationConfig(**generation_config_dict) - + # Load model self.logger.info("Loading model...") - - target_device = config.device + + target_device = config.device # Get model initialization kwargs model_init_kwargs = self.get_model_init_kwargs(config) - model_init_kwargs.update({ - "generation_config": gen_config - }) - - self.model = AutoModelForCausalLM.from_pretrained( - config.model_id, - **model_init_kwargs - ).eval() - + model_init_kwargs.update({"generation_config": gen_config}) + + self.model = AutoModelForCausalLM.from_pretrained(config.model_id, **model_init_kwargs).eval() + # Move model to target device self.logger.info(f"Moving model to device: {target_device}") self.model.to(target_device) self.device = target_device # Update device to match actual device used - + def _prepare_model_inputs(self, config: BenchmarkConfig): """Prepare model inputs. Override in subclasses for custom inputs.""" # Prepare inputs self.inputs = self.tokenizer(self.default_prompt, return_tensors="pt") - + # Move inputs to the same device as the model - if hasattr(self.model, 'device'): + if hasattr(self.model, "device"): # Model is on a single device model_device = self.model.device else: # Model might be distributed, use self.device which was set during model loading model_device = self.device - + self.inputs = {k: v.to(model_device) for k, v in self.inputs.items()} self.logger.debug(f"Moved inputs to device: {model_device}") - + def _configure_generation(self, config: BenchmarkConfig): """Configure generation settings.""" seq_length = self.inputs["input_ids"].shape[1] self.model.generation_config.max_length = seq_length + config.num_tokens_to_generate - + def cleanup_model(self) -> None: """Cleanup model resources.""" - if hasattr(self, 'model') and self.model is not None: + if hasattr(self, "model") and self.model is not None: del self.model self.model = None - if hasattr(self, 'compiled_model') and self.compiled_model is not None: + if hasattr(self, "compiled_model") and self.compiled_model is not None: del self.compiled_model self.compiled_model = None - if hasattr(self, 'tokenizer') and self.tokenizer is not None: + if hasattr(self, "tokenizer") and self.tokenizer is not None: del self.tokenizer self.tokenizer = None - if hasattr(self, 'past_key_values') and self.past_key_values is not None: + if hasattr(self, "past_key_values") and self.past_key_values is not None: del self.past_key_values self.past_key_values = None - + # Clear CUDA cache flush_memory() - + def measure_time_to_first_token(self, config: BenchmarkConfig) -> float: """Measure time to first token generation.""" model_to_use = self.compiled_model if self.compiled_model is not None else self.model - + # Prepare generation kwargs generation_kwargs = self._get_generation_kwargs(config, max_new_tokens=1) - + # Use CUDA timer for high-precision measurement with ArchAwareTimer(device=config.device) as timer: # Use SDPA context if specified with SDPAContext(config.sdpa_backend, self.logger): with torch.no_grad(): - outputs = model_to_use.generate(**generation_kwargs) - + _ = model_to_use.generate(**generation_kwargs) + return timer.elapsed_time() - + def measure_latency(self, config: BenchmarkConfig) -> TimingResult: """Measure full generation latency and compute tokens/sec.""" model_to_use = self.compiled_model if self.compiled_model is not None else self.model - + # Prepare generation kwargs generation_kwargs = self._get_generation_kwargs(config, max_new_tokens=config.num_tokens_to_generate) - + # Use CUDA timer for high-precision measurement with ArchAwareTimer(device=config.device) as timer: # Use SDPA context if specified with SDPAContext(config.sdpa_backend, self.logger): with torch.no_grad(): outputs = model_to_use.generate(**generation_kwargs) - + # Calculate metrics latency = timer.elapsed_time() input_length = self.inputs["input_ids"].shape[1] output_length = outputs.shape[1] tokens_generated = output_length - input_length - + tokens_per_second = tokens_generated / latency if latency > 0 else 0 time_per_output_token = latency / tokens_generated if tokens_generated > 0 else None - + return TimingResult( latency_seconds=latency, tokens_per_second=tokens_per_second, @@ -915,11 +897,11 @@ class ModelBenchmark(AbstractModelBenchmark): "variant": config.variant, "compile_mode": config.compile_mode, "attn_implementation": config.attn_implementation, - "sdpa_backend": config.sdpa_backend - } + "sdpa_backend": config.sdpa_backend, + }, ) - - def _get_generation_kwargs(self, config: BenchmarkConfig, max_new_tokens: int) -> Dict[str, Any]: + + def _get_generation_kwargs(self, config: BenchmarkConfig, max_new_tokens: int) -> dict[str, Any]: """Get generation kwargs. Override in subclasses for custom generation.""" generation_config_dict = self.get_default_generation_config() generation_kwargs = { @@ -930,76 +912,76 @@ class ModelBenchmark(AbstractModelBenchmark): "top_p": generation_config_dict.get("top_p", 1.0), "pad_token_id": self.tokenizer.pad_token_id, } - + # Handle static cache for compiled models if self.past_key_values is not None and config.variant == "compiled": try: from transformers import StaticCache + # Reset cache for each measurement seq_length = self.inputs["input_ids"].shape[1] - + # Get the actual device the model is on - if hasattr(self.model, 'device'): + if hasattr(self.model, "device"): cache_device = self.model.device else: cache_device = self.device - + fresh_cache = StaticCache( config=self.model.config, max_batch_size=config.batch_size, max_cache_len=seq_length + max_new_tokens, device=cache_device, - dtype=getattr(torch, config.torch_dtype) + dtype=getattr(torch, config.torch_dtype), ) generation_kwargs["past_key_values"] = fresh_cache except (ImportError, TypeError) as e: self.logger.debug(f"Fresh StaticCache creation failed: {e}") pass - + return generation_kwargs class BenchmarkRunner: """Main benchmark runner that coordinates benchmark execution.""" - + def __init__(self, logger: logging.Logger, output_dir: str = "benchmark_results"): self.logger = logger self.output_dir = output_dir os.makedirs(output_dir, exist_ok=True) - def run_benchmark( - self, - benchmark: ModelBenchmark, - scenarios: Dict[str, BenchmarkScenario], + self, + benchmark: ModelBenchmark, + scenarios: dict[str, BenchmarkScenario], collect_gpu_metrics: bool = True, - commit_id: Optional[str] = None - ) -> Dict[str, Dict[str, Any]]: + commit_id: Optional[str] = None, + ) -> dict[str, dict[str, Any]]: """ Run benchmarks using scenarios. - + Args: benchmark: The benchmark instance to run scenarios: Dictionary mapping scenario names to BenchmarkScenario instances collect_gpu_metrics: Whether to collect GPU utilization metrics commit_id: Git commit ID for metadata (if not provided, will auto-detect from git) - + Returns: Dictionary mapping scenario names to results with statistics """ all_results = {} - + for scenario_name, scenario in scenarios.items(): self.logger.info(f"Running benchmark scenario: {scenario_name}") config = scenario.config - + try: # Setup model for this configuration benchmark.setup_model(config) - + # Run scenario setup callbacks scenario.setup(benchmark.model, benchmark.tokenizer, self.logger) - + # Quick validation: try one measurement first to see if this scenario works try: flush_memory() @@ -1015,20 +997,20 @@ class BenchmarkRunner: except Exception: pass continue - + # Collect metadata metadata = BenchmarkMetadata( timestamp=datetime.utcnow().isoformat(), commit_id=commit_id, hardware_info=get_hardware_info(), - config=config + config=config, ) - + # Initialize GPU monitor gpu_monitor = None if collect_gpu_metrics: gpu_monitor = GPUMonitor(logger=self.logger) - + # Warmup runs self.logger.info(f"Warming up with {config.warmup_iterations} iterations...") warmup_failures = 0 @@ -1037,22 +1019,24 @@ class BenchmarkRunner: _ = benchmark.measure_latency(config) except Exception as e: warmup_failures += 1 - self.logger.warning(f"Warmup iteration {i+1} failed: {e}") - + self.logger.warning(f"Warmup iteration {i + 1} failed: {e}") + # If more than half the warmup iterations failed, skip this scenario if warmup_failures > config.warmup_iterations // 2: - self.logger.warning(f"Skipping scenario {scenario_name}: too many warmup failures ({warmup_failures}/{config.warmup_iterations})") + self.logger.warning( + f"Skipping scenario {scenario_name}: too many warmup failures ({warmup_failures}/{config.warmup_iterations})" + ) try: scenario.teardown(benchmark.model, benchmark.tokenizer, self.logger) benchmark.cleanup_model() except Exception: pass continue - + # Start GPU monitoring if gpu_monitor: gpu_monitor.start() - + # Measurement runs for latency self.logger.info(f"Measuring latency with {config.measurement_iterations} iterations...") latency_measurements = [] @@ -1060,69 +1044,83 @@ class BenchmarkRunner: tokens_per_sec_measurements = [] itl_measurements = [] # Inter-Token Latency measurement_failures = 0 - + for i in range(config.measurement_iterations): - try: + try: # Measure time to first token ttft = benchmark.measure_time_to_first_token(config) ttft_measurements.append(ttft) - + # Measure full latency timing_result = benchmark.measure_latency(config) latency_measurements.append(timing_result.latency_seconds) - + if timing_result.tokens_per_second is not None: tokens_per_sec_measurements.append(timing_result.tokens_per_second) - + if timing_result.time_per_output_token_seconds is not None: itl_measurements.append(timing_result.time_per_output_token_seconds) - - itl_str = f", itl={timing_result.time_per_output_token_seconds:.4f}s/token" if timing_result.time_per_output_token_seconds else "" - self.logger.debug(f"Iteration {i+1}: latency={timing_result.latency_seconds:.4f}s, ttft={ttft:.4f}s{itl_str}") - + + itl_str = ( + f", itl={timing_result.time_per_output_token_seconds:.4f}s/token" + if timing_result.time_per_output_token_seconds + else "" + ) + self.logger.debug( + f"Iteration {i + 1}: latency={timing_result.latency_seconds:.4f}s, ttft={ttft:.4f}s{itl_str}" + ) + except Exception as e: measurement_failures += 1 - self.logger.warning(f"Measurement iteration {i+1} failed: {e}") - + self.logger.warning(f"Measurement iteration {i + 1} failed: {e}") + # Stop GPU monitoring gpu_metrics = {} if gpu_monitor: gpu_metrics = gpu_monitor.stop_and_collect() - + # If we don't have enough successful measurements, skip this scenario if not latency_measurements or len(latency_measurements) < config.measurement_iterations // 2: - self.logger.warning(f"Skipping scenario {scenario_name}: insufficient successful measurements ({len(latency_measurements)}/{config.measurement_iterations})") + self.logger.warning( + f"Skipping scenario {scenario_name}: insufficient successful measurements ({len(latency_measurements)}/{config.measurement_iterations})" + ) try: scenario.teardown(benchmark.model, benchmark.tokenizer, self.logger) benchmark.cleanup_model() except Exception: pass continue - + # Calculate statistics scenario_results = { "metadata": asdict(metadata), "measurements": {}, "gpu_metrics": gpu_metrics, - "scenario_description": scenario.description + "scenario_description": scenario.description, } - + if latency_measurements: latency_stats = BenchmarkStatistics.from_measurements("latency_seconds", latency_measurements) scenario_results["measurements"]["latency_seconds"] = asdict(latency_stats) - + if ttft_measurements: - ttft_stats = BenchmarkStatistics.from_measurements("time_to_first_token_seconds", ttft_measurements) + ttft_stats = BenchmarkStatistics.from_measurements( + "time_to_first_token_seconds", ttft_measurements + ) scenario_results["measurements"]["time_to_first_token_seconds"] = asdict(ttft_stats) - + if tokens_per_sec_measurements: - tps_stats = BenchmarkStatistics.from_measurements("tokens_per_second", tokens_per_sec_measurements, "tokens/sec") + tps_stats = BenchmarkStatistics.from_measurements( + "tokens_per_second", tokens_per_sec_measurements, "tokens/sec" + ) scenario_results["measurements"]["tokens_per_second"] = asdict(tps_stats) - + if itl_measurements: - itl_stats = BenchmarkStatistics.from_measurements("time_per_output_token_seconds", itl_measurements, "seconds/token") + itl_stats = BenchmarkStatistics.from_measurements( + "time_per_output_token_seconds", itl_measurements, "seconds/token" + ) scenario_results["measurements"]["time_per_output_token_seconds"] = asdict(itl_stats) - + # Log summary if latency_measurements: self.logger.info(f"Latency: {latency_stats.mean:.4f}±{latency_stats.std:.4f}s (mean±std)") @@ -1132,25 +1130,26 @@ class BenchmarkRunner: self.logger.info(f"Throughput: {tps_stats.mean:.2f}±{tps_stats.std:.2f} tokens/sec (mean±std)") if itl_measurements: self.logger.info(f"ITL: {itl_stats.mean:.4f}±{itl_stats.std:.4f}s/token (mean±std)") - + # Add note about partial results if some measurements failed if measurement_failures > 0: scenario_results["warnings"] = [f"Some measurements failed ({measurement_failures} failures)"] self.logger.info(f"Scenario completed with {measurement_failures} measurement failures") - + # Run scenario teardown callbacks scenario.teardown(benchmark.model, benchmark.tokenizer, self.logger) - + # Cleanup model benchmark.cleanup_model() - + all_results[scenario_name] = scenario_results - + except Exception as e: self.logger.warning(f"Skipping scenario {scenario_name}: setup failed - {e}") import traceback + self.logger.debug(traceback.format_exc()) - + # Try to clean up if possible try: scenario.teardown(benchmark.model, benchmark.tokenizer, self.logger) @@ -1164,41 +1163,37 @@ class BenchmarkRunner: benchmark.cleanup_model() except Exception as cleanup_error: self.logger.warning(f"Cleanup failed for scenario {scenario_name}: {cleanup_error}") - + flush_memory() - + return all_results - - def save_results(self, model_name: str, results: Dict[str, Dict[str, Any]]) -> str: + + def save_results(self, model_name: str, results: dict[str, dict[str, Any]]) -> str: """Save benchmark results to JSON file.""" # Create model-specific subdirectory model_dir = os.path.join(self.output_dir, model_name) os.makedirs(model_dir, exist_ok=True) - + # Create filename with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{model_name}_benchmark_{timestamp}.json" filepath = os.path.join(model_dir, filename) - + # Prepare output structure - output_data = { - "model_name": model_name, - "benchmark_scenarios": [] - } - + output_data = {"model_name": model_name, "benchmark_scenarios": []} + for config_name, config_results in results.items(): scenario = { "scenario_name": config_name, "metadata": config_results["metadata"], "measurements": config_results["measurements"], - "gpu_metrics": config_results.get("gpu_metrics", {}) + "gpu_metrics": config_results.get("gpu_metrics", {}), } output_data["benchmark_scenarios"].append(scenario) - + # Save to JSON file - with open(filepath, 'w') as f: + with open(filepath, "w") as f: json.dump(output_data, f, indent=2, default=str) - + self.logger.info(f"Results saved to {filepath}") return filepath - \ No newline at end of file diff --git a/benchmark_v2/run_benchmarks.py b/benchmark_v2/run_benchmarks.py index 9a147b5dde6..26c816b9d16 100755 --- a/benchmark_v2/run_benchmarks.py +++ b/benchmark_v2/run_benchmarks.py @@ -14,350 +14,304 @@ # limitations under the License. """ -Top-level benchmarking script that automatically discovers and runs all benchmarks +Top-level benchmarking script that automatically discovers and runs all benchmarks in the ./benches directory, organizing outputs into model-specific subfolders. """ import argparse import importlib.util +import json import logging import os import sys -import json from datetime import datetime from pathlib import Path -from typing import Dict, List, Any, Optional +from typing import Any, Optional def setup_logging(log_level: str = "INFO", enable_file_logging: bool = False) -> logging.Logger: """Setup logging configuration.""" numeric_level = getattr(logging, log_level.upper(), None) if not isinstance(numeric_level, int): - raise ValueError(f'Invalid log level: {log_level}') - + raise ValueError(f"Invalid log level: {log_level}") + handlers = [logging.StreamHandler(sys.stdout)] - + if enable_file_logging: - handlers.append( - logging.FileHandler(f'benchmark_run_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log') - ) - + handlers.append(logging.FileHandler(f"benchmark_run_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")) + logging.basicConfig( - level=numeric_level, - format='[%(levelname)s - %(asctime)s] %(name)s: %(message)s', - handlers=handlers + level=numeric_level, format="[%(levelname)s - %(asctime)s] %(name)s: %(message)s", handlers=handlers ) - + return logging.getLogger(__name__) -def discover_benchmarks(benches_dir: str) -> List[Dict[str, Any]]: +def discover_benchmarks(benches_dir: str) -> list[dict[str, Any]]: """ Discover all benchmark modules in the benches directory. - + Returns: List of dictionaries containing benchmark module info """ benchmarks = [] benches_path = Path(benches_dir) - + if not benches_path.exists(): raise FileNotFoundError(f"Benches directory not found: {benches_dir}") - + for py_file in benches_path.glob("*.py"): if py_file.name.startswith("__"): continue - + module_name = py_file.stem - + try: # Import the module spec = importlib.util.spec_from_file_location(module_name, py_file) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) - + # Check if it has a benchmark runner function - if hasattr(module, f'run_{module_name}'): - benchmarks.append({ - 'name': module_name, - 'path': str(py_file), - 'module': module, - 'runner_function': getattr(module, f'run_{module_name}') - }) - elif hasattr(module, 'run_benchmark'): - benchmarks.append({ - 'name': module_name, - 'path': str(py_file), - 'module': module, - 'runner_function': getattr(module, 'run_benchmark') - }) + if hasattr(module, f"run_{module_name}"): + benchmarks.append( + { + "name": module_name, + "path": str(py_file), + "module": module, + "runner_function": getattr(module, f"run_{module_name}"), + } + ) + elif hasattr(module, "run_benchmark"): + benchmarks.append( + { + "name": module_name, + "path": str(py_file), + "module": module, + "runner_function": getattr(module, "run_benchmark"), + } + ) else: logging.warning(f"No runner function found in {py_file}") - + except Exception as e: logging.error(f"Failed to import {py_file}: {e}") - + return benchmarks def run_single_benchmark( - benchmark_info: Dict[str, Any], - output_dir: str, - logger: logging.Logger, - **kwargs + benchmark_info: dict[str, Any], output_dir: str, logger: logging.Logger, **kwargs ) -> Optional[str]: """ Run a single benchmark and return the output file path. - + Args: benchmark_info: Dictionary containing benchmark module info output_dir: Base output directory logger: Logger instance **kwargs: Additional arguments to pass to the benchmark - + Returns: Path to the output file if successful, None otherwise """ - benchmark_name = benchmark_info['name'] - runner_func = benchmark_info['runner_function'] - + benchmark_name = benchmark_info["name"] + runner_func = benchmark_info["runner_function"] + logger.info(f"Running benchmark: {benchmark_name}") - + try: # Check function signature to determine what arguments to pass import inspect + sig = inspect.signature(runner_func) - + # Prepare arguments based on function signature - func_kwargs = { - 'logger': logger, - 'output_dir': output_dir - } - + func_kwargs = {"logger": logger, "output_dir": output_dir} + # Add other kwargs if the function accepts them for param_name in sig.parameters: if param_name in kwargs: func_kwargs[param_name] = kwargs[param_name] - + # Filter kwargs to only include parameters the function accepts # If function has **kwargs, include all provided kwargs has_var_kwargs = any(param.kind == param.VAR_KEYWORD for param in sig.parameters.values()) if has_var_kwargs: valid_kwargs = {**func_kwargs, **kwargs} else: - valid_kwargs = {k: v for k, v in func_kwargs.items() - if k in sig.parameters} - + valid_kwargs = {k: v for k, v in func_kwargs.items() if k in sig.parameters} + # Run the benchmark result = runner_func(**valid_kwargs) - + if isinstance(result, str): # Function returned a file path return result else: logger.info(f"Benchmark {benchmark_name} completed successfully") return "completed" - + except Exception as e: logger.error(f"Benchmark {benchmark_name} failed: {e}") import traceback + logger.debug(traceback.format_exc()) return None -def generate_summary_report( - output_dir: str, - benchmark_results: Dict[str, Any], - logger: logging.Logger -) -> str: +def generate_summary_report(output_dir: str, benchmark_results: dict[str, Any], logger: logging.Logger) -> str: """Generate a summary report of all benchmark runs.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") summary_file = os.path.join(output_dir, f"benchmark_summary_{timestamp}.json") - + summary_data = { "run_metadata": { "timestamp": datetime.utcnow().isoformat(), "total_benchmarks": len(benchmark_results), "successful_benchmarks": len([r for r in benchmark_results.values() if r is not None]), - "failed_benchmarks": len([r for r in benchmark_results.values() if r is None]) + "failed_benchmarks": len([r for r in benchmark_results.values() if r is None]), }, "benchmark_results": benchmark_results, - "output_directory": output_dir + "output_directory": output_dir, } - - with open(summary_file, 'w') as f: + + with open(summary_file, "w") as f: json.dump(summary_data, f, indent=2, default=str) - + logger.info(f"Summary report saved to: {summary_file}") return summary_file def main(): """Main entry point for the benchmarking script.""" - parser = argparse.ArgumentParser( - description="Run all benchmarks in the ./benches directory" - ) - + parser = argparse.ArgumentParser(description="Run all benchmarks in the ./benches directory") + parser.add_argument( "--output-dir", type=str, default="benchmark_results", - help="Base output directory for benchmark results (default: benchmark_results)" + help="Base output directory for benchmark results (default: benchmark_results)", ) - + parser.add_argument( "--benches-dir", type=str, default="./benches", - help="Directory containing benchmark implementations (default: ./benches)" + help="Directory containing benchmark implementations (default: ./benches)", ) - + parser.add_argument( "--log-level", type=str, choices=["DEBUG", "INFO", "WARNING", "ERROR"], default="INFO", - help="Logging level (default: INFO)" + help="Logging level (default: INFO)", ) - + + parser.add_argument("--model-id", type=str, help="Specific model ID to benchmark (if supported by benchmarks)") + + parser.add_argument("--warmup-iterations", type=int, default=3, help="Number of warmup iterations (default: 3)") + parser.add_argument( - "--model-id", - type=str, - help="Specific model ID to benchmark (if supported by benchmarks)" + "--measurement-iterations", type=int, default=5, help="Number of measurement iterations (default: 5)" ) - - parser.add_argument( - "--warmup-iterations", - type=int, - default=3, - help="Number of warmup iterations (default: 3)" - ) - - parser.add_argument( - "--measurement-iterations", - type=int, - default=5, - help="Number of measurement iterations (default: 5)" - ) - + parser.add_argument( "--num-tokens-to-generate", type=int, default=100, - help="Number of tokens to generate in benchmarks (default: 100)" + help="Number of tokens to generate in benchmarks (default: 100)", ) - + + parser.add_argument("--include", type=str, nargs="*", help="Only run benchmarks matching these names") + + parser.add_argument("--exclude", type=str, nargs="*", help="Exclude benchmarks matching these names") + + parser.add_argument("--enable-mock", action="store_true", help="Enable mock benchmark (skipped by default)") + + parser.add_argument("--enable-file-logging", action="store_true", help="Enable file logging (disabled by default)") + parser.add_argument( - "--include", - type=str, - nargs="*", - help="Only run benchmarks matching these names" + "--commit-id", type=str, help="Git commit ID for metadata (if not provided, will auto-detect from git)" ) - - parser.add_argument( - "--exclude", - type=str, - nargs="*", - help="Exclude benchmarks matching these names" - ) - - parser.add_argument( - "--enable-mock", - action="store_true", - help="Enable mock benchmark (skipped by default)" - ) - - parser.add_argument( - "--enable-file-logging", - action="store_true", - help="Enable file logging (disabled by default)" - ) - - parser.add_argument( - "--commit-id", - type=str, - help="Git commit ID for metadata (if not provided, will auto-detect from git)" - ) - + args = parser.parse_args() - + # Setup logging logger = setup_logging(args.log_level, args.enable_file_logging) - + logger.info("Starting benchmark discovery and execution") logger.info(f"Output directory: {args.output_dir}") logger.info(f"Benches directory: {args.benches_dir}") - + # Create output directory os.makedirs(args.output_dir, exist_ok=True) - + try: # Discover benchmarks benchmarks = discover_benchmarks(args.benches_dir) logger.info(f"Discovered {len(benchmarks)} benchmark(s): {[b['name'] for b in benchmarks]}") - + if not benchmarks: logger.warning("No benchmarks found!") return 1 - + # Filter benchmarks based on include/exclude filtered_benchmarks = benchmarks - + if args.include: - filtered_benchmarks = [b for b in filtered_benchmarks - if any(pattern in b['name'] for pattern in args.include)] + filtered_benchmarks = [ + b for b in filtered_benchmarks if any(pattern in b["name"] for pattern in args.include) + ] logger.info(f"Filtered to include: {[b['name'] for b in filtered_benchmarks]}") - + if args.exclude: - filtered_benchmarks = [b for b in filtered_benchmarks - if not any(pattern in b['name'] for pattern in args.exclude)] + filtered_benchmarks = [ + b for b in filtered_benchmarks if not any(pattern in b["name"] for pattern in args.exclude) + ] logger.info(f"After exclusion: {[b['name'] for b in filtered_benchmarks]}") - + if not filtered_benchmarks: logger.warning("No benchmarks remaining after filtering!") return 1 - + # Prepare common kwargs for benchmarks benchmark_kwargs = { - 'warmup_iterations': args.warmup_iterations, - 'measurement_iterations': args.measurement_iterations, - 'num_tokens_to_generate': args.num_tokens_to_generate + "warmup_iterations": args.warmup_iterations, + "measurement_iterations": args.measurement_iterations, + "num_tokens_to_generate": args.num_tokens_to_generate, } - + if args.model_id: - benchmark_kwargs['model_id'] = args.model_id - + benchmark_kwargs["model_id"] = args.model_id + # Add enable_mock flag for mock benchmark - benchmark_kwargs['enable_mock'] = args.enable_mock - + benchmark_kwargs["enable_mock"] = args.enable_mock + # Add commit_id if provided if args.commit_id: - benchmark_kwargs['commit_id'] = args.commit_id - + benchmark_kwargs["commit_id"] = args.commit_id + # Run benchmarks benchmark_results = {} successful_count = 0 - + for benchmark_info in filtered_benchmarks: - result = run_single_benchmark( - benchmark_info, - args.output_dir, - logger, - **benchmark_kwargs - ) - - benchmark_results[benchmark_info['name']] = result - + result = run_single_benchmark(benchmark_info, args.output_dir, logger, **benchmark_kwargs) + + benchmark_results[benchmark_info["name"]] = result + if result is not None: successful_count += 1 - + # Generate summary report summary_file = generate_summary_report(args.output_dir, benchmark_results, logger) - + # Final summary total_benchmarks = len(filtered_benchmarks) failed_count = total_benchmarks - successful_count - + logger.info("=" * 60) logger.info("BENCHMARK RUN SUMMARY") logger.info("=" * 60) @@ -366,20 +320,21 @@ def main(): logger.info(f"Failed: {failed_count}") logger.info(f"Output directory: {args.output_dir}") logger.info(f"Summary report: {summary_file}") - + if failed_count > 0: logger.warning(f"{failed_count} benchmark(s) failed. Check logs for details.") return 1 else: logger.info("All benchmarks completed successfully!") return 0 - + except Exception as e: logger.error(f"Benchmark run failed: {e}") import traceback + logger.debug(traceback.format_exc()) return 1 if __name__ == "__main__": - sys.exit(main()) \ No newline at end of file + sys.exit(main()) diff --git a/scripts/check_tokenizers.py b/scripts/check_tokenizers.py index a099d794c2b..38e6965f4f8 100644 --- a/scripts/check_tokenizers.py +++ b/scripts/check_tokenizers.py @@ -4,8 +4,8 @@ import datasets import transformers from transformers.convert_slow_tokenizer import SLOW_TO_FAST_CONVERTERS -from transformers.utils import logging from transformers.tokenization_utils_base import PreTrainedTokenizerBase +from transformers.utils import logging logging.set_verbosity_info() @@ -22,7 +22,9 @@ imperfect = 0 wrong = 0 -def check_diff(spm_diff: list[int], tok_diff: list[int], slow: PreTrainedTokenizerBase, fast: PreTrainedTokenizerBase) -> bool: +def check_diff( + spm_diff: list[int], tok_diff: list[int], slow: PreTrainedTokenizerBase, fast: PreTrainedTokenizerBase +) -> bool: if spm_diff == list(reversed(tok_diff)): # AAA -> AA+A vs A+AA case. return True @@ -54,7 +56,9 @@ def check_LTR_mark(line: str, idx: int, fast: PreTrainedTokenizerBase) -> bool: return False -def check_details(line: str, spm_ids: list[int], tok_ids: list[int], slow: PreTrainedTokenizerBase, fast: PreTrainedTokenizerBase) -> bool: +def check_details( + line: str, spm_ids: list[int], tok_ids: list[int], slow: PreTrainedTokenizerBase, fast: PreTrainedTokenizerBase +) -> bool: # Encoding can be the same with same result AAA -> A + AA vs AA + A # We can check that we use at least exactly the same number of tokens. for i, (spm_id, tok_id) in enumerate(zip(spm_ids, tok_ids)): @@ -90,7 +94,9 @@ def check_details(line: str, spm_ids: list[int], tok_ids: list[int], slow: PreTr if tok_ids[first + k : first + k + min_width] == spm_ids[first + i : first + i + min_width] ] for j in possible_matches: - if check_diff(spm_ids[first : first + i], tok_ids[first : first + j], slow, fast) and check_details( + if check_diff( + spm_ids[first : first + i], tok_ids[first : first + j], slow, fast + ) and check_details( line, spm_ids[first + i : last], tok_ids[first + j : last], @@ -140,9 +146,9 @@ def test_string(slow: PreTrainedTokenizerBase, fast: PreTrainedTokenizerBase, te if skip_assert: return - assert ( - slow_ids == fast_ids - ), f"line {text} : \n\n{slow_ids}\n{fast_ids}\n\n{slow.tokenize(text)}\n{fast.tokenize(text)}" + assert slow_ids == fast_ids, ( + f"line {text} : \n\n{slow_ids}\n{fast_ids}\n\n{slow.tokenize(text)}\n{fast.tokenize(text)}" + ) def test_tokenizer(slow: PreTrainedTokenizerBase, fast: PreTrainedTokenizerBase) -> None: diff --git a/scripts/stale.py b/scripts/stale.py index bf7c6670c43..81bab2a5b24 100644 --- a/scripts/stale.py +++ b/scripts/stale.py @@ -15,6 +15,7 @@ Script to close stale issue. Taken in part from the AllenNLP repository. https://github.com/allenai/allennlp. """ + import os from datetime import datetime as dt @@ -39,10 +40,11 @@ def main(): for i, issue in enumerate(open_issues): print(i, issue) - comments = sorted(list(issue.get_comments()), key=lambda i: i.created_at, reverse=True) + comments = sorted(issue.get_comments(), key=lambda i: i.created_at, reverse=True) last_comment = comments[0] if len(comments) > 0 else None if ( - last_comment is not None and last_comment.user.login == "github-actions[bot]" + last_comment is not None + and last_comment.user.login == "github-actions[bot]" and (dt.utcnow() - issue.updated_at.replace(tzinfo=None)).days > 7 and (dt.utcnow() - issue.created_at.replace(tzinfo=None)).days >= 30 and not any(label.name.lower() in LABELS_TO_EXEMPT for label in issue.get_labels())