#!/usr/bin/env python # -*- coding: utf-8 -*- """ Скрипт для сравнения результатов InjectionBuilder при использовании ChunkRepository (SQLite) и InMemoryEntityRepository (предзагруженного из SQLite). """ import logging import random import sys from pathlib import Path from uuid import UUID # --- SQLAlchemy --- from sqlalchemy import and_, create_engine, select from sqlalchemy.orm import sessionmaker # --- Конфигурация --- # !!! ЗАМЕНИ НА АКТУАЛЬНЫЙ ПУТЬ К ТВОЕЙ БД НА СЕРВЕРЕ !!! DATABASE_URL = "sqlite:///../data/logs.db" # Пример пути, используй свой # Имя таблицы сущностей ENTITY_TABLE_NAME = "entity" # Исправь, если нужно # Количество случайных чанков для теста SAMPLE_SIZE = 300 # --- Настройка путей для импорта --- SCRIPT_DIR = Path(__file__).parent.resolve() PROJECT_ROOT = SCRIPT_DIR.parent # Перейти на уровень вверх (scripts -> project root) LIB_EXTRACTOR_PATH = PROJECT_ROOT / "lib" / "extractor" COMPONENTS_PATH = PROJECT_ROOT / "components" # Путь к компонентам sys.path.insert(0, str(PROJECT_ROOT)) sys.path.insert(0, str(LIB_EXTRACTOR_PATH)) sys.path.insert(0, str(COMPONENTS_PATH)) # Добавляем путь к ntr_text_fragmentation внутри lib/extractor sys.path.insert(0, str(LIB_EXTRACTOR_PATH / "ntr_text_fragmentation")) # --- Логирование --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # --- Импорты из проекта и библиотеки --- try: # Модели БД from ntr_text_fragmentation.core.entity_repository import \ InMemoryEntityRepository # Импортируем InMemory Repo from ntr_text_fragmentation.core.injection_builder import \ InjectionBuilder # Импортируем Builder # Модели сущностей from ntr_text_fragmentation.models import (Chunk, DocumentAsEntity, LinkerEntity) # Репозитории и билдер from components.dbo.chunk_repository import \ ChunkRepository # Импортируем ChunkRepository from components.dbo.models.acronym import \ Acronym # Импортируем модель из проекта from components.dbo.models.dataset import \ Dataset # Импортируем модель из проекта from components.dbo.models.dataset_document import \ DatasetDocument # Импортируем модель из проекта from components.dbo.models.document import \ Document # Импортируем модель из проекта from components.dbo.models.entity import \ EntityModel # Импортируем модель из проекта # TableEntity если есть # from ntr_text_fragmentation.models.table_entity import TableEntity except ImportError as e: logger.error(f"Ошибка импорта необходимых модулей: {e}") logger.error("Убедитесь, что скрипт находится в папке scripts вашего проекта,") logger.error("и структура проекта соответствует ожиданиям (наличие lib/extractor, components/dbo и т.д.).") sys.exit(1) # --- Вспомогательная функция для парсинга вывода --- def parse_output_by_source(text: str) -> dict[str, str]: """Разбивает текст на блоки по маркерам '[Источник]'.""" blocks = {} # Разделяем текст по маркеру parts = text.split('[Источник]') # Пропускаем первую часть (текст до первого маркера или пустая строка) for part in parts[1:]: part = part.strip() # Убираем лишние пробелы вокруг части if not part: continue # Ищем первый перенос строки newline_index = part.find('\n') if newline_index != -1: # Извлекаем заголовок ( - ИмяИсточника) header = part[:newline_index].strip() # Извлекаем контент content = part[newline_index+1:].strip() # Очищаем имя источника от " - " и пробелов source_name = header.removeprefix('-').strip() if source_name: # Убедимся, что имя источника не пустое if source_name in blocks: logger.warning(f"Найден дублирующийся источник '{source_name}' при парсинге split(). Контент будет перезаписан.") blocks[source_name] = content else: logger.warning(f"Не удалось извлечь имя источника из заголовка: '{header}'") else: # Если переноса строки нет, вся часть может быть заголовком без контента? logger.warning(f"Часть без переноса строки после '[Источник]': '{part[:100]}...'") return blocks # --- Основная функция сравнения --- def compare_repositories(): logger.info(f"Подключение к базе данных: {DATABASE_URL}") try: engine = create_engine(DATABASE_URL) # Определяем модель здесь, чтобы не зависеть от Base из другого места SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) db_session = SessionLocal() # 1. Инициализация ChunkRepository (нужен для доступа к _map_db_entity_to_linker_entity) # Передаем фабрику сессий, чтобы он мог создавать свои сессии при необходимости chunk_repo = ChunkRepository(db=SessionLocal) # 2. Загрузка ВСЕХ сущностей НАПРЯМУЮ из БД logger.info("Загрузка всех сущностей из БД через сессию...") all_db_models = db_session.query(EntityModel).all() logger.info(f"Загружено {len(all_db_models)} записей EntityModel.") if not all_db_models: logger.error("Не удалось загрузить сущности из базы данных. Проверьте подключение и наличие данных.") db_session.close() return # Конвертация в LinkerEntity с использованием маппинга из ChunkRepository logger.info("Конвертация EntityModel в LinkerEntity...") all_linker_entities = [chunk_repo._map_db_entity_to_linker_entity(model) for model in all_db_models] logger.info(f"Сконвертировано в {len(all_linker_entities)} LinkerEntity объектов.") # 3. Инициализация InMemoryEntityRepository logger.info("Инициализация InMemoryEntityRepository...") in_memory_repo = InMemoryEntityRepository(entities=all_linker_entities) logger.info(f"InMemoryEntityRepository инициализирован с {len(in_memory_repo.entities)} сущностями.") # 4. Получение ID искомых чанков НАПРЯМУЮ из БД logger.info("Получение ID искомых чанков из БД через сессию...") query = select(EntityModel.uuid).where( and_( EntityModel.in_search_text.isnot(None), ) ) results = db_session.execute(query).scalars().all() searchable_chunk_ids = [UUID(res) for res in results] logger.info(f"Найдено {len(searchable_chunk_ids)} сущностей для поиска.") if not searchable_chunk_ids: logger.warning("В базе данных не найдено сущностей для поиска (с in_search_text). Тест невозможен.") db_session.close() return # 5. Выборка случайных ID чанков actual_sample_size = min(SAMPLE_SIZE, len(searchable_chunk_ids)) if actual_sample_size < len(searchable_chunk_ids): logger.info(f"Выбираем {actual_sample_size} случайных ID сущностей для поиска из {len(searchable_chunk_ids)}...") sampled_chunk_ids = random.sample(searchable_chunk_ids, actual_sample_size) else: logger.info(f"Используем все {len(searchable_chunk_ids)} найденные ID сущностей для поиска (т.к. их меньше или равно {SAMPLE_SIZE}).") sampled_chunk_ids = searchable_chunk_ids # 6. Инициализация InjectionBuilders logger.info("Инициализация InjectionBuilder для ChunkRepository...") # Передаем ИМЕННО ЭКЗЕМПЛЯР chunk_repo, который мы создали builder_chunk_repo = InjectionBuilder(repository=chunk_repo) logger.info("Инициализация InjectionBuilder для InMemoryEntityRepository...") builder_in_memory = InjectionBuilder(repository=in_memory_repo) # 7. Сборка текста для обоих репозиториев logger.info(f"\n--- Сборка текста для ChunkRepository ({actual_sample_size} ID)... ---") try: # Передаем список UUID text_chunk_repo = builder_chunk_repo.build(filtered_entities=sampled_chunk_ids) logger.info(f"Сборка для ChunkRepository завершена. Общая длина: {len(text_chunk_repo)}") # --- Добавляем вывод начала текста --- print("\n--- Начало текста (ChunkRepository, первые 1000 символов): ---") print(text_chunk_repo[:1000]) print("--- Конец начала текста (ChunkRepository) ---") # ------------------------------------- except Exception as e: logger.error(f"Ошибка при сборке с ChunkRepository: {e}", exc_info=True) text_chunk_repo = f"ERROR_ChunkRepo: {e}" logger.info(f"\n--- Сборка текста для InMemoryEntityRepository ({actual_sample_size} ID)... ---") try: # Передаем список UUID text_in_memory = builder_in_memory.build(filtered_entities=sampled_chunk_ids) logger.info(f"Сборка для InMemoryEntityRepository завершена. Общая длина: {len(text_in_memory)}") # --- Добавляем вывод начала текста --- print("\n--- Начало текста (InMemory, первые 1000 символов): ---") print(text_in_memory[:1000]) print("--- Конец начала текста (InMemory) ---") # ------------------------------------- except Exception as e: logger.error(f"Ошибка при сборке с InMemoryEntityRepository: {e}", exc_info=True) text_in_memory = f"ERROR_InMemory: {e}" # 8. Парсинг результатов по блокам logger.info("\n--- Парсинг результатов по источникам ---") blocks_chunk_repo = parse_output_by_source(text_chunk_repo) blocks_in_memory = parse_output_by_source(text_in_memory) logger.info(f"ChunkRepo: Найдено {len(blocks_chunk_repo)} блоков источников.") logger.info(f"InMemory: Найдено {len(blocks_in_memory)} блоков источников.") # 9. Сравнение блоков logger.info("\n--- Сравнение блоков по источникам ---") chunk_repo_keys = set(blocks_chunk_repo.keys()) in_memory_keys = set(blocks_in_memory.keys()) all_keys = chunk_repo_keys | in_memory_keys mismatched_blocks = [] if chunk_repo_keys != in_memory_keys: logger.warning("Наборы источников НЕ СОВПАДАЮТ!") only_in_chunk = chunk_repo_keys - in_memory_keys only_in_memory = in_memory_keys - chunk_repo_keys if only_in_chunk: logger.warning(f" Источники только в ChunkRepo: {sorted(list(only_in_chunk))}") if only_in_memory: logger.warning(f" Источники только в InMemory: {sorted(list(only_in_memory))}") else: logger.info("Наборы источников совпадают.") logger.info("\n--- Сравнение содержимого общих источников ---") common_keys = chunk_repo_keys & in_memory_keys if not common_keys: logger.warning("Нет общих источников для сравнения содержимого.") else: all_common_blocks_match = True table_marker_found_in_any_chunk_repo = False table_marker_found_in_any_in_memory = False for key in sorted(list(common_keys)): content_chunk = blocks_chunk_repo.get(key, "") # Используем .get для безопасности content_memory = blocks_in_memory.get(key, "") # Используем .get для безопасности # Проверка наличия маркера таблиц has_tables_chunk = "###" in content_chunk has_tables_memory = "###" in content_memory if has_tables_chunk: table_marker_found_in_any_chunk_repo = True if has_tables_memory: table_marker_found_in_any_in_memory = True # Логируем наличие таблиц для КАЖДОГО блока (можно закомментировать, если много) # logger.info(f" Источник: '{key}' - Таблицы (###) в ChunkRepo: {has_tables_chunk}, в InMemory: {has_tables_memory}") if content_chunk != content_memory: all_common_blocks_match = False mismatched_blocks.append(key) logger.warning(f" НЕСОВПАДЕНИЕ для источника: '{key}' (Таблицы в ChunkRepo: {has_tables_chunk}, в InMemory: {has_tables_memory})") # Можно добавить вывод diff для конкретного блока, если нужно # import difflib # block_diff = difflib.unified_diff( # content_chunk.splitlines(keepends=True), # content_memory.splitlines(keepends=True), # fromfile=f'{key}_ChunkRepo', # tofile=f'{key}_InMemory', # lineterm='', # ) # print("\nDiff для блока:") # sys.stdout.writelines(list(block_diff)[:20]) # Показать начало diff блока # if len(list(block_diff)) > 20: print("...") # else: # # Логируем совпадение только если таблицы есть хоть где-то, для краткости # if has_tables_chunk or has_tables_memory: # logger.info(f" Совпадение для источника: '{key}' (Таблицы в ChunkRepo: {has_tables_chunk}, в InMemory: {has_tables_memory})") # Выводим общую информацию о наличии таблиц logger.info("--- Итог проверки таблиц (###) в общих блоках ---") logger.info(f"Маркер таблиц '###' найден хотя бы в одном блоке ChunkRepo: {table_marker_found_in_any_chunk_repo}") logger.info(f"Маркер таблиц '###' найден хотя бы в одном блоке InMemory: {table_marker_found_in_any_in_memory}") logger.info("-------------------------------------------------") if all_common_blocks_match: logger.info("Содержимое ВСЕХ общих источников СОВПАДАЕТ.") else: logger.warning(f"Найдено НЕСОВПАДЕНИЕ содержимого для {len(mismatched_blocks)} источников: {sorted(mismatched_blocks)}") logger.info("\n--- Итоговый вердикт ---") if chunk_repo_keys == in_memory_keys and not mismatched_blocks: logger.info("ПОЛНОЕ СОВПАДЕНИЕ: Наборы источников и их содержимое идентичны.") elif chunk_repo_keys == in_memory_keys and mismatched_blocks: logger.warning("ЧАСТИЧНОЕ СОВПАДЕНИЕ: Наборы источников совпадают, но содержимое некоторых блоков различается.") else: logger.warning("НЕСОВПАДЕНИЕ: Наборы источников различаются (и, возможно, содержимое общих тоже).") except ImportError as e: # Ловим ошибки импорта, возникшие внутри функций (маловероятно после старта) logger.error(f"Критическая ошибка импорта: {e}") except Exception as e: logger.error(f"Произошла общая ошибка: {e}", exc_info=True) finally: if 'db_session' in locals() and db_session: db_session.close() logger.info("Сессия базы данных закрыта.") # --- Запуск --- if __name__ == "__main__": # Используем Path для более надежного определения пути db_path = Path(DATABASE_URL.replace("sqlite:///", "")) if not db_path.exists(): print(f"!!! ОШИБКА: Файл базы данных НЕ НАЙДЕН по пути: {db_path.resolve()} !!!") print(f"!!! Проверьте значение DATABASE_URL в скрипте. !!!") elif "путь/к/твоей" in DATABASE_URL: # Доп. проверка на placeholder print("!!! ПОЖАЛУЙСТА, УКАЖИТЕ ПРАВИЛЬНЫЙ ПУТЬ К БАЗЕ ДАННЫХ В ПЕРЕМЕННОЙ DATABASE_URL !!!") else: compare_repositories()