import logging from typing import Callable, Optional from uuid import UUID from ntr_fileparser import ParsedDocument from ntr_text_fragmentation import (EntitiesExtractor, InjectionBuilder, LinkerEntity) import numpy as np from common.configuration import Configuration from components.dbo.chunk_repository import ChunkRepository from components.embedding_extraction import EmbeddingExtractor from components.nmd.faiss_vector_search import FaissVectorSearch logger = logging.getLogger(__name__) class EntityService: """ Сервис для работы с сущностями. Объединяет функциональность chunk_repository, destructurer, injection_builder и faiss_vector_search. """ def __init__( self, vectorizer: EmbeddingExtractor, chunk_repository: ChunkRepository, config: Configuration, ) -> None: """ Инициализация сервиса. Args: vectorizer: Модель для извлечения эмбеддингов chunk_repository: Репозиторий для работы с чанками config: Конфигурация приложения """ self.vectorizer = vectorizer self.config = config self.chunk_repository = chunk_repository self.faiss_search = None # Инициализируется при необходимости self.current_dataset_id = None # Текущий dataset_id self.neighbors_max_distance = config.db_config.entities.neighbors_max_distance self.max_entities_per_message = config.db_config.search.max_entities_per_message self.max_entities_per_dialogue = config.db_config.search.max_entities_per_dialogue self.entities_extractor = EntitiesExtractor( strategy_name=config.db_config.entities.strategy_name, strategy_params=config.db_config.entities.strategy_params, process_tables=config.db_config.entities.process_tables, ) def _ensure_faiss_initialized(self, dataset_id: int) -> None: """ Проверяет и при необходимости инициализирует или обновляет FAISS индекс. Args: dataset_id: ID датасета для инициализации """ # Если индекс не инициализирован или датасет изменился if self.faiss_search is None or self.current_dataset_id != dataset_id: logger.info(f'Initializing FAISS for dataset {dataset_id}') entities, embeddings = self.chunk_repository.get_searching_entities( dataset_id ) if entities: # Создаем словарь только из не-None эмбеддингов embeddings_dict = { str(entity.id): embedding # Преобразуем UUID в строку для ключа for entity, embedding in zip(entities, embeddings) if embedding is not None } if embeddings_dict: # Проверяем, что есть хотя бы один эмбеддинг self.faiss_search = FaissVectorSearch( self.vectorizer, embeddings_dict, ) self.current_dataset_id = dataset_id logger.info( f'FAISS initialized for dataset {dataset_id} with {len(embeddings_dict)} embeddings' ) else: logger.warning( f'No valid embeddings found for dataset {dataset_id}' ) self.faiss_search = None self.current_dataset_id = None else: logger.warning(f'No entities found for dataset {dataset_id}') self.faiss_search = None self.current_dataset_id = None def process_document( self, document: ParsedDocument, dataset_id: int, progress_callback: Optional[Callable] = None, ) -> None: """ Обработка документа: разбиение на чанки и сохранение в базу. Args: document: Документ для обработки dataset_id: ID датасета progress_callback: Функция для отслеживания прогресса """ logger.info(f"Processing document {document.name} for dataset {dataset_id}") # Получаем сущности entities = self.entities_extractor.extract(document) # Фильтруем сущности для поиска filtering_entities = [ entity for entity in entities if entity.in_search_text is not None ] filtering_texts = [entity.in_search_text for entity in filtering_entities] # Получаем эмбеддинги с поддержкой callback embeddings = self.vectorizer.vectorize(filtering_texts, progress_callback) embeddings_dict = { str(entity.id): embedding # Преобразуем UUID в строку для ключа for entity, embedding in zip(filtering_entities, embeddings) } # Сохраняем в базу self.chunk_repository.add_entities(entities, dataset_id, embeddings_dict) logger.info(f"Added {len(entities)} entities to dataset {dataset_id}") def build_text( self, entities: list[str], chunk_scores: Optional[list[float]] = None, include_tables: bool = True, max_documents: Optional[int] = None, ) -> str: """ Сборка текста из сущностей. Args: entities: Список идентификаторов сущностей chunk_scores: Список весов чанков include_tables: Флаг включения таблиц max_documents: Максимальное количество документов Returns: Собранный текст """ entities = [UUID(entity) for entity in entities] entities = self.chunk_repository.get_entities_by_ids(entities) logger.info(f"Building text for {len(entities)} entities") if chunk_scores is not None: chunk_scores = { entity.id: score for entity, score in zip(entities, chunk_scores) } builder = InjectionBuilder(self.chunk_repository) return builder.build( entities, scores=chunk_scores, include_tables=include_tables, neighbors_max_distance=self.neighbors_max_distance, max_documents=max_documents, ) def search_similar_old( self, query: str, dataset_id: int, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ Поиск похожих сущностей. Args: query: Текст запроса dataset_id: ID датасета Returns: tuple[np.ndarray, np.ndarray, np.ndarray]: - Вектор запроса - Оценки сходства - Идентификаторы найденных сущностей """ # Убеждаемся, что FAISS инициализирован для текущего датасета self._ensure_faiss_initialized(dataset_id) if self.faiss_search is None: return np.array([]), np.array([]), np.array([]) # Выполняем поиск return self.faiss_search.search_vectors(query) def search_similar( self, query: str, dataset_id: int, previous_entities: list[list[str]] = None, ) -> tuple[list[list[str]], list[str], list[float]]: """ Поиск похожих сущностей. Args: query: Текст запроса dataset_id: ID датасета previous_entities: Список идентификаторов сущностей, которые уже были найдены Returns: tuple[list[list[str]], list[str], list[float]]: - Перефильтрованный список идентификаторов сущностей из прошлых запросов - Список идентификаторов найденных сущностей - Скоры найденных сущностей """ self._ensure_faiss_initialized(dataset_id) if self.faiss_search is None: return previous_entities, [], [] if sum(len(entities) for entities in previous_entities) < self.max_entities_per_dialogue - self.max_entities_per_message: _, scores, ids = self.faiss_search.search_vectors(query, self.max_entities_per_message) try: scores = scores.tolist() ids = ids.tolist() except: scores = list(scores) ids = list(ids) return previous_entities, ids, scores if previous_entities: _, scores, ids = self.faiss_search.search_vectors(query, self.max_entities_per_dialogue) scores = scores.tolist() ids = ids.tolist() print(ids) previous_entities_ids = [[entity for entity in sublist if entity in ids] for sublist in previous_entities] previous_entities_flat = [entity for sublist in previous_entities_ids for entity in sublist] new_entities = [] new_scores = [] for id_, score in zip(ids, scores): if id_ not in previous_entities_flat: new_entities.append(id_) new_scores.append(score) if len(new_entities) >= self.max_entities_per_message: break return previous_entities, new_entities, new_scores else: _, scores, ids = self.faiss_search.search_vectors(query, self.max_entities_per_dialogue) scores = scores.tolist() ids = ids.tolist() return [], ids, scores