Skip to content

vllm.entrypoints.openai.engine.serving

OpenAIServing

Bases: BeamSearchOnlineMixin

Source code in vllm/entrypoints/openai/engine/serving.py
class OpenAIServing(BeamSearchOnlineMixin):
    request_id_prefix: ClassVar[str] = """
    A short string prepended to every request’s ID.
    """

    def __init__(
        self,
        engine_client: EngineClient,
        models: OpenAIServingModels,
        *,
        request_logger: RequestLogger | None,
        return_tokens_as_token_ids: bool = False,
    ):
        super().__init__()

        self.engine_client = engine_client
        self.models = models

        self.request_logger = request_logger
        self.return_tokens_as_token_ids = return_tokens_as_token_ids

        self.model_config = engine_client.model_config
        self.renderer = engine_client.renderer
        self.input_processor = engine_client.input_processor
        vllm_config = getattr(engine_client, "vllm_config", None)
        kv_transfer_config = getattr(vllm_config, "kv_transfer_config", None)
        self.has_kv_connector = kv_transfer_config is not None

        # Computed once at startup (cached by ``vllm_config`` identity) and
        # stamped on non-streaming responses. Streaming chunks deliberately
        # omit it to avoid per-chunk overhead.
        from vllm.entrypoints.serve.utils.fingerprint import get_system_fingerprint

        try:
            self.system_fingerprint: str | None = get_system_fingerprint(
                engine_client.vllm_config
            )
        except Exception:
            # Never fail server startup over the fingerprint.
            self.system_fingerprint = None

    @staticmethod
    def create_error_response(
        message: str | Exception,
        err_type: str = "BadRequestError",
        status_code: HTTPStatus = HTTPStatus.BAD_REQUEST,
        param: str | None = None,
    ) -> ErrorResponse:
        return create_error_response(message, err_type, status_code, param)

    def create_streaming_error_response(
        self,
        message: str | Exception,
        err_type: str = "BadRequestError",
        status_code: HTTPStatus = HTTPStatus.BAD_REQUEST,
        param: str | None = None,
    ) -> str:
        json_str = json.dumps(
            self.create_error_response(
                message=message,
                err_type=err_type,
                status_code=status_code,
                param=param,
            ).model_dump()
        )
        return json_str

    def _raise_if_error(self, finish_reason: str | None, request_id: str) -> None:
        """Raise GenerationError if finish_reason indicates an error."""
        if finish_reason == "error":
            logger.error(
                "Request %s failed with an internal error during generation",
                request_id,
            )
            raise GenerationError("Internal server error")

    def _convert_generation_error_to_streaming_response(
        self, e: GenerationError
    ) -> str:
        """Convert GenerationError to streaming error response."""
        return self.create_streaming_error_response(
            str(e),
            err_type="InternalServerError",
            status_code=e.status_code,
        )

    async def _check_model(
        self,
        request: AnyRequest,
    ) -> ErrorResponse | None:
        error_response = None

        if self._is_model_supported(request.model):
            return None
        if request.model in self.models.lora_requests:
            return None
        if (
            envs.VLLM_ALLOW_RUNTIME_LORA_UPDATING
            and request.model
            and (load_result := await self.models.resolve_lora(request.model))
        ):
            if isinstance(load_result, LoRARequest):
                return None
            if (
                isinstance(load_result, ErrorResponse)
                and load_result.error.code == HTTPStatus.BAD_REQUEST.value
            ):
                error_response = load_result

        return error_response or self.create_error_response(
            message=f"The model `{request.model}` does not exist.",
            err_type="NotFoundError",
            status_code=HTTPStatus.NOT_FOUND,
            param="model",
        )

    def _get_active_default_mm_loras(self, request: AnyRequest) -> LoRARequest | None:
        """Determine if there are any active default multimodal loras."""
        # TODO: Currently this is only enabled for chat completions
        # to be better aligned with only being enabled for .generate
        # when run offline. It would be nice to support additional
        # tasks types in the future.
        message_types = self._get_message_types(request)
        default_mm_loras = set()

        for lora in self.models.lora_requests.values():
            # Best effort match for default multimodal lora adapters;
            # There is probably a better way to do this, but currently
            # this matches against the set of 'types' in any content lists
            # up until '_', e.g., to match audio_url -> audio
            if lora.lora_name in message_types:
                default_mm_loras.add(lora)

        # Currently only support default modality specific loras if
        # we have exactly one lora matched on the request.
        if len(default_mm_loras) == 1:
            return default_mm_loras.pop()
        return None

    def _maybe_get_adapters(
        self,
        request: AnyRequest,
        supports_default_mm_loras: bool = False,
    ) -> LoRARequest | None:
        if request.model in self.models.lora_requests:
            return self.models.lora_requests[request.model]

        # Currently only support default modality specific loras
        # if we have exactly one lora matched on the request.
        if supports_default_mm_loras:
            default_mm_lora = self._get_active_default_mm_loras(request)
            if default_mm_lora is not None:
                return default_mm_lora

        if self._is_model_supported(request.model):
            return None

        # if _check_model has been called earlier, this will be unreachable
        raise ValueError(f"The model `{request.model}` does not exist.")

    def _get_message_types(self, request: AnyRequest) -> set[str]:
        """Retrieve the set of types from message content dicts up
        until `_`; we use this to match potential multimodal data
        with default per modality loras.
        """
        message_types: set[str] = set()

        if not hasattr(request, "messages"):
            return message_types

        messages = request.messages
        if messages is None or isinstance(messages, (str, bytes)):
            return message_types

        for message in messages:
            if (
                isinstance(message, dict)
                and "content" in message
                and isinstance(message["content"], list)
            ):
                for content_dict in message["content"]:
                    if "type" in content_dict:
                        message_types.add(content_dict["type"].split("_")[0])
        return message_types

    def _validate_chat_template(
        self,
        request_chat_template: str | None,
        chat_template_kwargs: dict[str, Any] | None,
        trust_request_chat_template: bool,
    ) -> ErrorResponse | None:
        if not trust_request_chat_template and (
            request_chat_template is not None
            or (
                chat_template_kwargs
                and chat_template_kwargs.get("chat_template") is not None
            )
        ):
            return self.create_error_response(
                "Chat template is passed with request, but "
                "--trust-request-chat-template is not set. "
                "Refused request with untrusted chat template."
            )
        return None

    @staticmethod
    def _prepare_extra_chat_template_kwargs(
        request_chat_template_kwargs: dict[str, Any] | None = None,
        default_chat_template_kwargs: dict[str, Any] | None = None,
    ) -> dict[str, Any]:
        """Helper to merge server-default and request-specific chat template kwargs."""
        request_chat_template_kwargs = request_chat_template_kwargs or {}
        if default_chat_template_kwargs is None:
            return request_chat_template_kwargs
        # Apply server defaults first, then request kwargs override.
        return default_chat_template_kwargs | request_chat_template_kwargs

    def _extract_prompt_components(self, prompt: PromptType | EngineInput):
        return extract_prompt_components(self.model_config, prompt)

    def _extract_prompt_text(self, prompt: PromptType | EngineInput):
        return self._extract_prompt_components(prompt).text

    def _extract_prompt_len(self, prompt: EngineInput):
        return extract_prompt_len(self.model_config, prompt)

    def _log_inputs(
        self,
        request_id: str,
        inputs: PromptType | EngineInput,
        params: SamplingParams | BeamSearchParams | None,
        lora_request: LoRARequest | None,
    ) -> None:
        if self.request_logger is None:
            return

        components = self._extract_prompt_components(inputs)

        self.request_logger.log_inputs(
            request_id,
            components.text,
            components.token_ids,
            components.embeds,
            params=params,
            lora_request=lora_request,
        )

    async def _get_trace_headers(
        self,
        headers: Headers,
    ) -> Mapping[str, str] | None:
        is_tracing_enabled = await self.engine_client.is_tracing_enabled()

        if is_tracing_enabled:
            return extract_trace_headers(headers)

        if contains_trace_headers(headers):
            log_tracing_disabled_warning()

        return None

    @staticmethod
    def _base_request_id(
        raw_request: Request | None, default: str | None = None
    ) -> str | None:
        """Pulls the request id to use from a header, if provided"""
        if raw_request is not None and (
            (req_id := raw_request.headers.get("X-Request-Id")) is not None
        ):
            return req_id

        return random_uuid() if default is None else default

    @staticmethod
    def _get_data_parallel_rank(raw_request: Request | None) -> int | None:
        """Pulls the data parallel rank from a header, if provided"""
        if raw_request is None:
            return None

        rank_str = raw_request.headers.get("X-data-parallel-rank")
        if rank_str is None:
            return None

        try:
            return int(rank_str)
        except ValueError:
            return None

    async def _with_kv_transfer_rejection_cleanup(
        self,
        awaitable: Awaitable[_T],
        request: ChatCompletionRequest | CompletionRequest | ResponsesRequest,
        raw_request: Request | None,
    ) -> _T:
        """Wrap a `create_*` coroutine so that, if it raises or returns an
        ErrorResponse (i.e. the request never reached the engine), the KV
        connector is notified to free any pinned remote-prefill blocks."""
        kv_transfer_params = self.has_kv_connector and request.kv_transfer_params
        if not kv_transfer_params or not kv_transfer_params.get("do_remote_prefill"):
            return await awaitable

        notify = True
        try:
            result = await awaitable
            if not isinstance(result, ErrorResponse):
                notify = False
            return result
        finally:
            if notify:
                try:
                    await self.engine_client.notify_kv_transfer_request_rejected(
                        request.request_id,
                        kv_transfer_params,
                        data_parallel_rank=self._get_data_parallel_rank(raw_request),
                    )
                except Exception:
                    logger.warning(
                        "Failed to notify KV connector about rejected request %s",
                        request.request_id,
                        exc_info=True,
                    )

    @staticmethod
    def _get_decoded_token(
        logprob: Logprob,
        token_id: int,
        tokenizer: TokenizerLike | None,
        return_as_token_id: bool = False,
    ) -> str:
        if return_as_token_id:
            return f"token_id:{token_id}"

        if logprob.decoded_token is not None:
            return logprob.decoded_token

        if tokenizer is None:
            raise ValueError(
                "Unable to get tokenizer because `skip_tokenizer_init=True`"
            )

        return tokenizer.decode([token_id])

    def _is_model_supported(self, model_name: str | None) -> bool:
        if not model_name:
            return True
        if envs.VLLM_SKIP_MODEL_NAME_VALIDATION:
            return True
        return self.models.is_base_model(model_name)

_base_request_id staticmethod

_base_request_id(
    raw_request: Request | None, default: str | None = None
) -> str | None

Pulls the request id to use from a header, if provided

Source code in vllm/entrypoints/openai/engine/serving.py
@staticmethod
def _base_request_id(
    raw_request: Request | None, default: str | None = None
) -> str | None:
    """Pulls the request id to use from a header, if provided"""
    if raw_request is not None and (
        (req_id := raw_request.headers.get("X-Request-Id")) is not None
    ):
        return req_id

    return random_uuid() if default is None else default

_convert_generation_error_to_streaming_response

_convert_generation_error_to_streaming_response(
    e: GenerationError,
) -> str

Convert GenerationError to streaming error response.

Source code in vllm/entrypoints/openai/engine/serving.py
def _convert_generation_error_to_streaming_response(
    self, e: GenerationError
) -> str:
    """Convert GenerationError to streaming error response."""
    return self.create_streaming_error_response(
        str(e),
        err_type="InternalServerError",
        status_code=e.status_code,
    )

_get_active_default_mm_loras

_get_active_default_mm_loras(
    request: AnyRequest,
) -> LoRARequest | None

Determine if there are any active default multimodal loras.

Source code in vllm/entrypoints/openai/engine/serving.py
def _get_active_default_mm_loras(self, request: AnyRequest) -> LoRARequest | None:
    """Determine if there are any active default multimodal loras."""
    # TODO: Currently this is only enabled for chat completions
    # to be better aligned with only being enabled for .generate
    # when run offline. It would be nice to support additional
    # tasks types in the future.
    message_types = self._get_message_types(request)
    default_mm_loras = set()

    for lora in self.models.lora_requests.values():
        # Best effort match for default multimodal lora adapters;
        # There is probably a better way to do this, but currently
        # this matches against the set of 'types' in any content lists
        # up until '_', e.g., to match audio_url -> audio
        if lora.lora_name in message_types:
            default_mm_loras.add(lora)

    # Currently only support default modality specific loras if
    # we have exactly one lora matched on the request.
    if len(default_mm_loras) == 1:
        return default_mm_loras.pop()
    return None

_get_data_parallel_rank staticmethod

_get_data_parallel_rank(
    raw_request: Request | None,
) -> int | None

Pulls the data parallel rank from a header, if provided

Source code in vllm/entrypoints/openai/engine/serving.py
@staticmethod
def _get_data_parallel_rank(raw_request: Request | None) -> int | None:
    """Pulls the data parallel rank from a header, if provided"""
    if raw_request is None:
        return None

    rank_str = raw_request.headers.get("X-data-parallel-rank")
    if rank_str is None:
        return None

    try:
        return int(rank_str)
    except ValueError:
        return None

_get_message_types

_get_message_types(request: AnyRequest) -> set[str]

Retrieve the set of types from message content dicts up until _; we use this to match potential multimodal data with default per modality loras.

Source code in vllm/entrypoints/openai/engine/serving.py
def _get_message_types(self, request: AnyRequest) -> set[str]:
    """Retrieve the set of types from message content dicts up
    until `_`; we use this to match potential multimodal data
    with default per modality loras.
    """
    message_types: set[str] = set()

    if not hasattr(request, "messages"):
        return message_types

    messages = request.messages
    if messages is None or isinstance(messages, (str, bytes)):
        return message_types

    for message in messages:
        if (
            isinstance(message, dict)
            and "content" in message
            and isinstance(message["content"], list)
        ):
            for content_dict in message["content"]:
                if "type" in content_dict:
                    message_types.add(content_dict["type"].split("_")[0])
    return message_types

_prepare_extra_chat_template_kwargs staticmethod

_prepare_extra_chat_template_kwargs(
    request_chat_template_kwargs: dict[str, Any]
    | None = None,
    default_chat_template_kwargs: dict[str, Any]
    | None = None,
) -> dict[str, Any]

Helper to merge server-default and request-specific chat template kwargs.

Source code in vllm/entrypoints/openai/engine/serving.py
@staticmethod
def _prepare_extra_chat_template_kwargs(
    request_chat_template_kwargs: dict[str, Any] | None = None,
    default_chat_template_kwargs: dict[str, Any] | None = None,
) -> dict[str, Any]:
    """Helper to merge server-default and request-specific chat template kwargs."""
    request_chat_template_kwargs = request_chat_template_kwargs or {}
    if default_chat_template_kwargs is None:
        return request_chat_template_kwargs
    # Apply server defaults first, then request kwargs override.
    return default_chat_template_kwargs | request_chat_template_kwargs

_raise_if_error

_raise_if_error(
    finish_reason: str | None, request_id: str
) -> None

Raise GenerationError if finish_reason indicates an error.

Source code in vllm/entrypoints/openai/engine/serving.py
def _raise_if_error(self, finish_reason: str | None, request_id: str) -> None:
    """Raise GenerationError if finish_reason indicates an error."""
    if finish_reason == "error":
        logger.error(
            "Request %s failed with an internal error during generation",
            request_id,
        )
        raise GenerationError("Internal server error")

_with_kv_transfer_rejection_cleanup async

_with_kv_transfer_rejection_cleanup(
    awaitable: Awaitable[_T],
    request: ChatCompletionRequest
    | CompletionRequest
    | ResponsesRequest,
    raw_request: Request | None,
) -> _T

Wrap a create_* coroutine so that, if it raises or returns an ErrorResponse (i.e. the request never reached the engine), the KV connector is notified to free any pinned remote-prefill blocks.

Source code in vllm/entrypoints/openai/engine/serving.py
async def _with_kv_transfer_rejection_cleanup(
    self,
    awaitable: Awaitable[_T],
    request: ChatCompletionRequest | CompletionRequest | ResponsesRequest,
    raw_request: Request | None,
) -> _T:
    """Wrap a `create_*` coroutine so that, if it raises or returns an
    ErrorResponse (i.e. the request never reached the engine), the KV
    connector is notified to free any pinned remote-prefill blocks."""
    kv_transfer_params = self.has_kv_connector and request.kv_transfer_params
    if not kv_transfer_params or not kv_transfer_params.get("do_remote_prefill"):
        return await awaitable

    notify = True
    try:
        result = await awaitable
        if not isinstance(result, ErrorResponse):
            notify = False
        return result
    finally:
        if notify:
            try:
                await self.engine_client.notify_kv_transfer_request_rejected(
                    request.request_id,
                    kv_transfer_params,
                    data_parallel_rank=self._get_data_parallel_rank(raw_request),
                )
            except Exception:
                logger.warning(
                    "Failed to notify KV connector about rejected request %s",
                    request.request_id,
                    exc_info=True,
                )