import logging from typing import Callable, Optional from uuid import UUID import numpy as np from ntr_fileparser import ParsedDocument from ntr_text_fragmentation import Destructurer, InjectionBuilder, LinkerEntity from common.configuration import Configuration from components.dbo.chunk_repository import ChunkRepository from components.embedding_extraction import EmbeddingExtractor from components.nmd.faiss_vector_search import FaissVectorSearch logger = logging.getLogger(__name__) class EntityService: """ Сервис для работы с сущностями. Объединяет функциональность chunk_repository, destructurer, injection_builder и faiss_vector_search. """ def __init__( self, vectorizer: EmbeddingExtractor, chunk_repository: ChunkRepository, config: Configuration, ) -> None: """ Инициализация сервиса. Args: vectorizer: Модель для извлечения эмбеддингов chunk_repository: Репозиторий для работы с чанками config: Конфигурация приложения """ self.vectorizer = vectorizer self.config = config self.chunk_repository = chunk_repository self.faiss_search = None # Инициализируется при необходимости self.current_dataset_id = None # Текущий dataset_id def _ensure_faiss_initialized(self, dataset_id: int) -> None: """ Проверяет и при необходимости инициализирует или обновляет FAISS индекс. Args: dataset_id: ID датасета для инициализации """ # Если индекс не инициализирован или датасет изменился if self.faiss_search is None or self.current_dataset_id != dataset_id: logger.info(f'Initializing FAISS for dataset {dataset_id}') entities, embeddings = self.chunk_repository.get_searching_entities(dataset_id) if entities: # Создаем словарь только из не-None эмбеддингов embeddings_dict = { str(entity.id): embedding # Преобразуем UUID в строку для ключа for entity, embedding in zip(entities, embeddings) if embedding is not None } if embeddings_dict: # Проверяем, что есть хотя бы один эмбеддинг self.faiss_search = FaissVectorSearch( self.vectorizer, embeddings_dict, self.config.db_config, ) self.current_dataset_id = dataset_id logger.info(f'FAISS initialized for dataset {dataset_id} with {len(embeddings_dict)} embeddings') else: logger.warning(f'No valid embeddings found for dataset {dataset_id}') self.faiss_search = None self.current_dataset_id = None else: logger.warning(f'No entities found for dataset {dataset_id}') self.faiss_search = None self.current_dataset_id = None def process_document( self, document: ParsedDocument, dataset_id: int, progress_callback: Optional[Callable] = None, **destructurer_kwargs, ) -> None: """ Обработка документа: разбиение на чанки и сохранение в базу. Args: document: Документ для обработки dataset_id: ID датасета progress_callback: Функция для отслеживания прогресса **destructurer_kwargs: Дополнительные параметры для Destructurer """ logger.info(f"Processing document {document.name} for dataset {dataset_id}") # Создаем деструктуризатор с параметрами по умолчанию destructurer = Destructurer( document, strategy_name="fixed_size", process_tables=True, **{ "words_per_chunk": 50, "overlap_words": 25, "respect_sentence_boundaries": True, **destructurer_kwargs, } ) # Получаем сущности entities = destructurer.destructure() # Фильтруем сущности для поиска filtering_entities = [entity for entity in entities if entity.in_search_text is not None] filtering_texts = [entity.in_search_text for entity in filtering_entities] # Получаем эмбеддинги с поддержкой callback embeddings = self.vectorizer.vectorize(filtering_texts, progress_callback) embeddings_dict = { str(entity.id): embedding # Преобразуем UUID в строку для ключа for entity, embedding in zip(filtering_entities, embeddings) } # Сохраняем в базу self.chunk_repository.add_entities(entities, dataset_id, embeddings_dict) # Переинициализируем FAISS индекс, если это текущий датасет if self.current_dataset_id == dataset_id: self._ensure_faiss_initialized(dataset_id) logger.info(f"Added {len(entities)} entities to dataset {dataset_id}") def build_text( self, entities: list[LinkerEntity], chunk_scores: Optional[list[float]] = None, include_tables: bool = True, max_documents: Optional[int] = None, ) -> str: """ Сборка текста из сущностей. Args: entities: Список сущностей chunk_scores: Список весов чанков include_tables: Флаг включения таблиц max_documents: Максимальное количество документов Returns: Собранный текст """ logger.info(f"Building text for {len(entities)} entities") if chunk_scores is not None: chunk_scores = {entity.id: score for entity, score in zip(entities, chunk_scores)} builder = InjectionBuilder(self.chunk_repository) return builder.build( [entity.id for entity in entities], # Передаем UUID напрямую chunk_scores=chunk_scores, include_tables=include_tables, max_documents=max_documents, ) def search_similar( self, query: str, dataset_id: int, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ Поиск похожих сущностей. Args: query: Текст запроса dataset_id: ID датасета Returns: tuple[np.ndarray, np.ndarray, np.ndarray]: - Вектор запроса - Оценки сходства - Идентификаторы найденных сущностей """ # Убеждаемся, что FAISS инициализирован для текущего датасета self._ensure_faiss_initialized(dataset_id) if self.faiss_search is None: return np.array([]), np.array([]), np.array([]) # Выполняем поиск return self.faiss_search.search_vectors(query) def add_neighboring_chunks( self, entities: list[LinkerEntity], max_distance: int = 1, ) -> list[LinkerEntity]: """ Добавление соседних чанков. Args: entities: Список сущностей max_distance: Максимальное расстояние для поиска соседей Returns: Расширенный список сущностей """ # Убедимся, что все ID представлены в UUID формате for entity in entities: if not isinstance(entity.id, UUID): entity.id = UUID(str(entity.id)) builder = InjectionBuilder(self.chunk_repository) return builder.add_neighboring_chunks(entities, max_distance)