[CI] Fail subprocess tests with root-cause error (#23795)

Signed-off-by: Nick Hill <nhill@redhat.com>
This commit is contained in:
Nick Hill
2025-09-10 13:53:21 -07:00
committed by GitHub
parent a0933c3bd6
commit 4db4426404
6 changed files with 138 additions and 33 deletions

View File

@ -21,6 +21,7 @@ ray[cgraph,default]>=2.48.0 # Ray Compiled Graph, required by pipeline paralleli
sentence-transformers # required for embedding tests
soundfile # required for audio tests
jiwer # required for audio tests
tblib # for pickling test exceptions
timm >=1.0.17 # required for internvl and gemma3n-mm test
torch==2.8.0
torchaudio==2.8.0

View File

@ -137,7 +137,7 @@ contourpy==1.3.0
# via matplotlib
cramjam==2.9.0
# via fastparquet
cupy-cuda12x==13.3.0
cupy-cuda12x==13.6.0
# via ray
cycler==0.12.1
# via matplotlib
@ -1032,6 +1032,8 @@ tabledata==1.3.3
# via pytablewriter
tabulate==0.9.0
# via sacrebleu
tblib==3.1.0
# via -r requirements/test.in
tcolorpy==0.1.6
# via pytablewriter
tenacity==9.0.0

View File

@ -1,6 +1,7 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import copyreg
import os
import subprocess
import sys
@ -10,6 +11,30 @@ from pathlib import Path
import pytest
import requests
import urllib3.exceptions
def _pickle_new_connection_error(obj):
"""Custom pickler for NewConnectionError to fix tblib compatibility."""
# Extract the original message by removing the "conn: " prefix
full_message = obj.args[0] if obj.args else ""
if ': ' in full_message:
# Split off the connection part and keep the actual message
_, actual_message = full_message.split(': ', 1)
else:
actual_message = full_message
return _unpickle_new_connection_error, (actual_message, )
def _unpickle_new_connection_error(message):
"""Custom unpickler for NewConnectionError."""
# Create with None as conn and the actual message
return urllib3.exceptions.NewConnectionError(None, message)
# Register the custom pickle/unpickle functions for tblib compatibility
copyreg.pickle(urllib3.exceptions.NewConnectionError,
_pickle_new_connection_error)
def _query_server(prompt: str, max_tokens: int = 5) -> dict:
@ -52,6 +77,7 @@ def api_server(distributed_executor_backend: str):
uvicorn_process.terminate()
@pytest.mark.timeout(300)
@pytest.mark.parametrize("distributed_executor_backend", ["mp", "ray"])
def test_api_server(api_server, distributed_executor_backend: str):
"""

View File

@ -1,5 +1,15 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
# ruff: noqa
from tblib import pickling_support
# Install support for pickling exceptions so that we can nicely propagate
# failures from tests running in a subprocess.
# This should be run before any custom exception subclasses are defined.
pickling_support.install()
import http.server
import json
import math

View File

@ -2,6 +2,7 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import asyncio
import contextlib
import copy
import functools
import importlib
@ -13,7 +14,7 @@ import sys
import tempfile
import time
import warnings
from contextlib import contextmanager, suppress
from contextlib import ExitStack, contextmanager, suppress
from multiprocessing import Process
from pathlib import Path
from typing import Any, Callable, Literal, Optional, Union
@ -800,43 +801,106 @@ _P = ParamSpec("_P")
def fork_new_process_for_each_test(
f: Callable[_P, None]) -> Callable[_P, None]:
func: Callable[_P, None]) -> Callable[_P, None]:
"""Decorator to fork a new process for each test function.
See https://github.com/vllm-project/vllm/issues/7053 for more details.
"""
@functools.wraps(f)
@functools.wraps(func)
def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> None:
# Make the process the leader of its own process group
# to avoid sending SIGTERM to the parent process
os.setpgrp()
from _pytest.outcomes import Skipped
pid = os.fork()
print(f"Fork a new process to run a test {pid}")
if pid == 0:
try:
f(*args, **kwargs)
except Skipped as e:
# convert Skipped to exit code 0
print(str(e))
os._exit(0)
except Exception:
import traceback
traceback.print_exc()
os._exit(1)
# Create a unique temporary file to store exception info from child
# process. Use test function name and process ID to avoid collisions.
with tempfile.NamedTemporaryFile(
delete=False,
mode='w+b',
prefix=f"vllm_test_{func.__name__}_{os.getpid()}_",
suffix=".exc") as exc_file, ExitStack() as delete_after:
exc_file_path = exc_file.name
delete_after.callback(os.remove, exc_file_path)
pid = os.fork()
print(f"Fork a new process to run a test {pid}")
if pid == 0:
# Parent process responsible for deleting, don't delete
# in child.
delete_after.pop_all()
try:
func(*args, **kwargs)
except Skipped as e:
# convert Skipped to exit code 0
print(str(e))
os._exit(0)
except Exception as e:
import traceback
tb_string = traceback.format_exc()
# Try to serialize the exception object first
exc_to_serialize: dict[str, Any]
try:
# First, try to pickle the actual exception with
# its traceback.
exc_to_serialize = {'pickled_exception': e}
# Test if it can be pickled
cloudpickle.dumps(exc_to_serialize)
except (Exception, KeyboardInterrupt):
# Fall back to string-based approach.
exc_to_serialize = {
'exception_type': type(e).__name__,
'exception_msg': str(e),
'traceback': tb_string,
}
try:
with open(exc_file_path, 'wb') as f:
cloudpickle.dump(exc_to_serialize, f)
except Exception:
# Fallback: just print the traceback.
print(tb_string)
os._exit(1)
else:
os._exit(0)
else:
os._exit(0)
else:
pgid = os.getpgid(pid)
_pid, _exitcode = os.waitpid(pid, 0)
# ignore SIGTERM signal itself
old_signal_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN)
# kill all child processes
os.killpg(pgid, signal.SIGTERM)
# restore the signal handler
signal.signal(signal.SIGTERM, old_signal_handler)
assert _exitcode == 0, (f"function {f} failed when called with"
f" args {args} and kwargs {kwargs}")
pgid = os.getpgid(pid)
_pid, _exitcode = os.waitpid(pid, 0)
# ignore SIGTERM signal itself
old_signal_handler = signal.signal(signal.SIGTERM,
signal.SIG_IGN)
# kill all child processes
os.killpg(pgid, signal.SIGTERM)
# restore the signal handler
signal.signal(signal.SIGTERM, old_signal_handler)
if _exitcode != 0:
# Try to read the exception from the child process
exc_info = {}
if os.path.exists(exc_file_path):
with contextlib.suppress(Exception), \
open(exc_file_path, 'rb') as f:
exc_info = cloudpickle.load(f)
if (original_exception :=
exc_info.get('pickled_exception')) is not None:
# Re-raise the actual exception object if it was
# successfully pickled.
assert isinstance(original_exception, Exception)
raise original_exception
if (original_tb := exc_info.get("traceback")) is not None:
# Use string-based traceback for fallback case
raise AssertionError(
f"Test {func.__name__} failed when called with"
f" args {args} and kwargs {kwargs}"
f" (exit code: {_exitcode}):\n{original_tb}"
) from None
# Fallback to the original generic error
raise AssertionError(
f"function {func.__name__} failed when called with"
f" args {args} and kwargs {kwargs}"
f" (exit code: {_exitcode})") from None
return wrapper

View File

@ -117,10 +117,12 @@ class RayDistributedExecutor(DistributedExecutorBase):
self.driver_worker.execute_method)
def shutdown(self) -> None:
logger.info(
"Shutting down Ray distributed executor. If you see error log "
"from logging.cc regarding SIGTERM received, please ignore because "
"this is the expected termination process in Ray.")
if logger:
# Somehow logger can be None here.
logger.info(
"Shutting down Ray distributed executor. If you see error log "
"from logging.cc regarding SIGTERM received, please ignore "
"because this is the expected termination process in Ray.")
if hasattr(self, "forward_dag") and self.forward_dag is not None:
self.forward_dag.teardown()
import ray