import json from typing import AsyncGenerator, Optional, List import httpx import logging from transformers import AutoTokenizer from components.llm.utils import convert_to_openai_format from components.llm.common import ChatRequest, LlmParams, LlmApi, LlmPredictParams logging.basicConfig( level=logging.DEBUG, format="%(asctime)s - %(message)s", ) class DeepInfraApi(LlmApi): """ Класс для работы с API vllm. """ def __init__(self, params: LlmParams): super().__init__() super().set_params(params) print('Tokenizer initialization.') # self.tokenizer = AutoTokenizer.from_pretrained(params.tokenizer if params.tokenizer is not None else params.model) print(f"Tokenizer initialized for model {params.model}.") async def get_models(self) -> List[str]: """ Выполняет GET-запрос к API для получения списка доступных моделей. Возвращает: list[str]: Список идентификаторов моделей. Если произошла ошибка или данные недоступны, возвращается пустой список. Исключения: Все ошибки HTTP-запросов логируются в консоль, но не выбрасываются дальше. """ try: async with httpx.AsyncClient() as client: response = await client.get(f"{self.params.url}/v1/openai/models", headers=super().create_headers()) if response.status_code == 200: json_data = response.json() return [item['id'] for item in json_data.get('data', [])] except httpx.RequestError as error: print('Error fetching models:', error) return [] def create_messages(self, prompt: str, system_prompt: str = None) -> List[dict]: """ Создает сообщения для LLM на основе переданного промпта и системного промпта (если он задан). Args: prompt (str): Пользовательский промпт. Returns: list[dict]: Список сообщений с ролями и содержимым. """ actual_prompt = self.apply_llm_template_to_prompt(prompt) messages = [] if system_prompt is not None: messages.append({"role": "system", "content": system_prompt}) else: if self.params.predict_params and self.params.predict_params.system_prompt: messages.append({"role": "system", "content": self.params.predict_params.system_prompt}) messages.append({"role": "user", "content": actual_prompt}) return messages def apply_llm_template_to_prompt(self, prompt: str) -> str: """ Применяет шаблон LLM к переданному промпту, если он задан. Args: prompt (str): Пользовательский промпт. Returns: str: Промпт с примененным шаблоном (или оригинальный, если шаблон отсутствует). """ actual_prompt = prompt if self.params.template is not None: actual_prompt = self.params.template.replace("{{PROMPT}}", actual_prompt) return actual_prompt async def tokenize(self, prompt: str) -> Optional[dict]: """ Токенизирует входной текстовый промпт. Args: prompt (str): Текст, который нужно токенизировать. Returns: dict: Словарь с токенами и их количеством или None в случае ошибки. """ try: tokens = self.tokenizer.encode(prompt, add_special_tokens=True) return {"result": tokens, "num_tokens": len(tokens), "max_length": self.params.context_length} except Exception as e: print(f"Tokenization error: {e}") return None async def detokenize(self, tokens: List[int]) -> Optional[str]: """ Детокенизирует список токенов обратно в строку. Args: tokens (List[int]): Список токенов, который нужно преобразовать в текст. Returns: str: Восстановленный текст или None в случае ошибки. """ try: text = self.tokenizer.decode(tokens, skip_special_tokens=True) return text except Exception as e: print(f"Detokenization error: {e}") return None def create_chat_request(self, chat_request: ChatRequest, system_prompt, params: LlmPredictParams) -> dict: """ Создает запрос для предсказания на основе параметров LLM. Args: prompt (str): Промпт для запроса. Returns: dict: Словарь с параметрами для выполнения запроса. """ request = { "stream": False, "model": self.params.model, } predict_params = params if predict_params: if predict_params.stop: non_empty_stop = list(filter(lambda o: o != "", predict_params.stop)) if non_empty_stop: request["stop"] = non_empty_stop if predict_params.n_predict is not None: request["max_tokens"] = int(predict_params.n_predict or 0) request["temperature"] = float(predict_params.temperature or 0) if predict_params.top_k is not None: request["top_k"] = int(predict_params.top_k) if predict_params.top_p is not None: request["top_p"] = float(predict_params.top_p) if predict_params.min_p is not None: request["min_p"] = float(predict_params.min_p) if predict_params.seed is not None: request["seed"] = int(predict_params.seed) if predict_params.n_keep is not None: request["n_keep"] = int(predict_params.n_keep) if predict_params.cache_prompt is not None: request["cache_prompt"] = bool(predict_params.cache_prompt) if predict_params.repeat_penalty is not None: request["repetition_penalty"] = float(predict_params.repeat_penalty) if predict_params.repeat_last_n is not None: request["repeat_last_n"] = int(predict_params.repeat_last_n) if predict_params.presence_penalty is not None: request["presence_penalty"] = float(predict_params.presence_penalty) if predict_params.frequency_penalty is not None: request["frequency_penalty"] = float(predict_params.frequency_penalty) request["messages"] = convert_to_openai_format(chat_request, system_prompt) return request async def create_request(self, prompt: str, system_prompt: str = None) -> dict: """ Создает запрос для предсказания на основе параметров LLM. Args: prompt (str): Промпт для запроса. Returns: dict: Словарь с параметрами для выполнения запроса. """ request = { "stream": False, "model": self.params.model, } predict_params = self.params.predict_params if predict_params: if predict_params.stop: non_empty_stop = list(filter(lambda o: o != "", predict_params.stop)) if non_empty_stop: request["stop"] = non_empty_stop if predict_params.n_predict is not None: request["max_tokens"] = int(predict_params.n_predict or 0) request["temperature"] = float(predict_params.temperature or 0) if predict_params.top_k is not None: request["top_k"] = int(predict_params.top_k) if predict_params.top_p is not None: request["top_p"] = float(predict_params.top_p) if predict_params.min_p is not None: request["min_p"] = float(predict_params.min_p) if predict_params.seed is not None: request["seed"] = int(predict_params.seed) if predict_params.n_keep is not None: request["n_keep"] = int(predict_params.n_keep) if predict_params.cache_prompt is not None: request["cache_prompt"] = bool(predict_params.cache_prompt) if predict_params.repeat_penalty is not None: request["repetition_penalty"] = float(predict_params.repeat_penalty) if predict_params.repeat_last_n is not None: request["repeat_last_n"] = int(predict_params.repeat_last_n) if predict_params.presence_penalty is not None: request["presence_penalty"] = float(predict_params.presence_penalty) if predict_params.frequency_penalty is not None: request["frequency_penalty"] = float(predict_params.frequency_penalty) request["messages"] = self.create_messages(prompt, system_prompt) return request async def trim_sources(self, sources: str, user_request: str, system_prompt: str = None) -> dict: raise NotImplementedError("This function is not supported.") async def predict_chat(self, request: ChatRequest, system_prompt, params: LlmPredictParams) -> str: """ Выполняет запрос к API и возвращает результат. Args: prompt (str): Входной текст для предсказания. Returns: str: Сгенерированный текст. """ async with httpx.AsyncClient() as client: request = self.create_chat_request(request, system_prompt, params) response = await client.post(f"{self.params.url}/v1/openai/chat/completions", headers=super().create_headers(), json=request, timeout=httpx.Timeout(connect=5.0, read=60.0, write=180, pool=10)) if response.status_code == 200: return response.json()["choices"][0]["message"]["content"] else: logging.error(f"Request failed: status code {response.status_code}") logging.error(response.text) async def predict_chat_stream(self, request: ChatRequest, system_prompt, params: LlmPredictParams) -> str: """ Выполняет запрос к API с поддержкой потокового вывода (SSE) и возвращает результат. Args: prompt (str): Входной текст для предсказания. Returns: str: Сгенерированный текст. """ async with httpx.AsyncClient() as client: request = self.create_chat_request(request, system_prompt, params) request["stream"] = True print(super().create_headers()) async with client.stream("POST", f"{self.params.url}/v1/openai/chat/completions", json=request, headers=super().create_headers()) as response: if response.status_code != 200: # Если ошибка, читаем ответ для получения подробностей error_content = await response.aread() raise Exception(f"API error: {error_content.decode('utf-8')}") # Для хранения результата generated_text = "" # Асинхронное чтение построчно async for line in response.aiter_lines(): if line.startswith("data: "): # SSE-сообщения начинаются с "data: " try: # Парсим JSON из строки data = json.loads(line[len("data: "):].strip()) if data == "[DONE]": # Конец потока break if "choices" in data and data["choices"]: # Получаем текст из текущего токена token_value = data["choices"][0].get("delta", {}).get("content", "") generated_text += token_value except json.JSONDecodeError: continue # Игнорируем строки, которые не удается декодировать return generated_text.strip() async def get_predict_chat_generator(self, request: ChatRequest, system_prompt: str, params: LlmPredictParams) -> AsyncGenerator[str, None]: """ Выполняет потоковый запрос к API и возвращает токены по мере их генерации. Args: request (ChatRequest): История чата. system_prompt (str): Системный промпт. params (LlmPredictParams): Параметры предсказания. Yields: str: Токены ответа LLM. """ params async with httpx.AsyncClient() as client: request_data = self.create_chat_request(request, system_prompt, params) request_data["stream"] = True async with client.stream( "POST", f"{self.params.url}/v1/openai/chat/completions", json=request_data, headers=super().create_headers() ) as response: if response.status_code != 200: error_content = await response.aread() raise Exception(f"API error: {error_content.decode('utf-8')}") async for line in response.aiter_lines(): if line.startswith("data: "): try: data = json.loads(line[len("data: "):].strip()) if data == "[DONE]": break if "choices" in data and data["choices"]: token_value = data["choices"][0].get("delta", {}).get("content", "") if token_value: yield token_value except json.JSONDecodeError: continue async def predict(self, prompt: str, system_prompt: str) -> str: """ Выполняет запрос к API и возвращает результат. Args: prompt (str): Входной текст для предсказания. Returns: str: Сгенерированный текст. """ async with httpx.AsyncClient() as client: request = await self.create_request(prompt, system_prompt) response = await client.post(f"{self.params.url}/v1/openai/chat/completions", headers=super().create_headers(), json=request, timeout=httpx.Timeout(connect=5.0, read=60.0, write=180, pool=10)) if response.status_code == 200: return response.json()["choices"][0]["message"]["content"] else: logging.info(f"Request {prompt} failed: status code {response.status_code}") logging.info(response.text) async def trim_prompt(self, prompt: str, system_prompt: str = None): result = await self.tokenize(prompt) result_system = None system_prompt_length = 0 if system_prompt is not None: result_system = await self.tokenize(system_prompt) if result_system is not None: system_prompt_length = len(result_system["result"]) # в случае ошибки при токенизации, вернем исходную строку безопасной длины if result["result"] is None or (system_prompt is not None and result_system is None): return prompt[int(self.params.context_length / 3)] #вероятно, часть уходит на форматирование чата, надо проверить max_length = result["max_length"] - len(result["result"]) - system_prompt_length - self.params.predict_params.n_predict detokenized_str = await self.detokenize(result["result"][:max_length]) # в случае ошибки при детокенизации, вернем исходную строку безопасной длины if detokenized_str is None: return prompt[self.params.context_length / 3] return detokenized_str