#!/usr/bin/env python """ Скрипт для визуального тестирования процесса чанкинга и сборки документа. Этот скрипт: 1. Считывает test_input/test.docx с помощью UniversalParser 2. Чанкит документ через Destructurer с fixed_size-стратегией 3. Сохраняет результат чанкинга в test_output/test.csv 4. Выбирает 20-30 случайных чанков из CSV 5. Создает InjectionBuilder с InMemoryEntityRepository 6. Собирает текст из выбранных чанков 7. Сохраняет результат в test_output/test_builded.txt """ import json import logging import os import random from pathlib import Path from typing import List from uuid import UUID import pandas as pd from ntr_fileparser import UniversalParser from ntr_text_fragmentation import (DocumentAsEntity, EntitiesExtractor, InjectionBuilder, InMemoryEntityRepository, LinkerEntity) def setup_logging() -> None: """Настройка логгирования.""" logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - [%(pathname)s:%(lineno)d] - %(message)s", ) def ensure_directories() -> None: """Проверка наличия необходимых директорий.""" for directory in ["test_input", "test_output"]: Path(directory).mkdir(parents=True, exist_ok=True) def save_entities_to_csv(entities: List[LinkerEntity], csv_path: str) -> None: """ Сохраняет сущности в CSV файл. Args: entities: Список сущностей csv_path: Путь для сохранения CSV файла """ data = [] for entity in entities: # Базовые поля для всех типов сущностей entity_dict = { "id": str(entity.id), "type": entity.type, "name": entity.name, "text": entity.text, "metadata": json.dumps(entity.metadata or {}, ensure_ascii=False), "in_search_text": entity.in_search_text, "source_id": str(entity.source_id) if entity.source_id else None, "target_id": str(entity.target_id) if entity.target_id else None, "number_in_relation": entity.number_in_relation, "groupper": entity.groupper, "type": entity.type, } # Дополнительные поля специфичные для подклассов (если они есть в __dict__) # Это не самый надежный способ, но для скрипта визуализации может подойти # Сериализация LinkerEntity теперь должна сама класть доп поля в metadata # for key, value in entity.__dict__.items(): # if key not in entity_dict and not key.startswith('_'): # entity_dict[key] = value data.append(entity_dict) df = pd.DataFrame(data) # Указываем кодировку UTF-8 при записи CSV df.to_csv(csv_path, index=False, encoding='utf-8') logging.info(f"Сохранено {len(entities)} сущностей в {csv_path}") def load_entities_from_csv(csv_path: str) -> List[LinkerEntity]: """ Загружает сущности из CSV файла. Args: csv_path: Путь к CSV файлу Returns: Список сущностей """ df = pd.read_csv(csv_path) entities = [] for _, row in df.iterrows(): # Обработка метаданных metadata_str = row.get("metadata", "{}") try: # Используем json.loads для парсинга JSON строки metadata = ( json.loads(metadata_str) if pd.notna(metadata_str) and metadata_str else {} ) except json.JSONDecodeError: # Ловим ошибку JSON logging.warning( f"Не удалось распарсить метаданные JSON: {metadata_str}. Используется пустой словарь." ) metadata = {} # Общие поля для всех типов сущностей # Преобразуем ID обратно в UUID entity_id = row['id'] if isinstance(entity_id, str): try: entity_id = UUID(entity_id) except ValueError: logging.warning( f"Неверный формат UUID для id: {entity_id}. Пропускаем сущность." ) continue common_args = { "id": entity_id, "name": row["name"] if pd.notna(row.get("name")) else "", "text": row["text"] if pd.notna(row.get("text")) else "", "metadata": metadata, "in_search_text": ( row["in_search_text"] if pd.notna(row.get('in_search_text')) else None ), "type": ( row["type"] if pd.notna(row.get('type')) else LinkerEntity.__name__ ), # Используем базовый тип, если не указан "groupper": row["groupper"] if pd.notna(row.get("groupper")) else None, } # Добавляем поля связи, если они есть, преобразуя в UUID source_id_str = row.get("source_id") target_id_str = row.get("target_id") if pd.notna(source_id_str): try: common_args["source_id"] = UUID(source_id_str) except ValueError: logging.warning( f"Неверный формат UUID для source_id: {source_id_str}. Пропускаем поле." ) if pd.notna(target_id_str): try: common_args["target_id"] = UUID(target_id_str) except ValueError: logging.warning( f"Неверный формат UUID для target_id: {target_id_str}. Пропускаем поле." ) if pd.notna(row.get("number_in_relation")): try: common_args["number_in_relation"] = int(row["number_in_relation"]) except ValueError: logging.warning( f"Неверный формат для number_in_relation: {row['number_in_relation']}. Пропускаем поле." ) # Пытаемся десериализовать в конкретный тип, если он известен entity_class = LinkerEntity._entity_classes.get( common_args["type"], LinkerEntity ) try: # Создаем экземпляр, передавая только те аргументы, которые ожидает класс # (используя LinkerEntity._deserialize_to_me как пример, но нужно убедиться, # что он принимает все нужные поля или имеет **kwargs) # Пока создаем базовый LinkerEntity, т.к. подклассы могут требовать специфичные поля # которых нет в CSV или в common_args entity = LinkerEntity(**common_args) # Если нужно строгое восстановление типов, потребуется более сложная логика # с проверкой полей каждого подкласса except TypeError as e: logging.warning( f"Ошибка создания экземпляра {entity_class.__name__} для ID {common_args['id']}: {e}. Создан базовый LinkerEntity." ) entity = LinkerEntity(**common_args) # Откат к базовому классу entities.append(entity) logging.info(f"Загружено {len(entities)} сущностей из {csv_path}") return entities def main() -> None: """Основная функция скрипта.""" setup_logging() ensure_directories() # Пути к файлам input_doc_path = "test_input/test2.docx" output_csv_path = "test_output/test2.csv" output_text_path = "test_output/test2.md" # Проверка наличия входного файла if not os.path.exists(input_doc_path): logging.error(f"Файл {input_doc_path} не найден!") return logging.info(f"Парсинг документа {input_doc_path}") try: # Шаг 1: Парсинг документа дважды, как если бы это были два разных документа parser = UniversalParser() document1 = parser.parse_by_path(input_doc_path) document2 = parser.parse_by_path(input_doc_path) # Меняем название второго документа, чтобы отличить его document2.name = document2.name + "_copy" if document2.name else "copy_doc" # Шаг 2: Чанкинг и извлечение таблиц с использованием EntitiesExtractor all_entities = [] # Обработка первого документа logging.info("Начало процесса деструктуризации первого документа") # Инициализируем экстрактор без документа (используем дефолтные настройки или настроим позже) extractor1 = EntitiesExtractor() # Настройка чанкинга extractor1.configure_chunking( strategy_name="fixed_size", strategy_params={ "words_per_chunk": 50, "overlap_words": 25, "respect_sentence_boundaries": True, # Добавлено по запросу }, ) # Настройка извлечения таблиц extractor1.configure_tables_extraction(process_tables=True) # Выполнение деструктуризации entities1 = extractor1.extract(document1) # Находим ID документа 1 doc1_entity = next((e for e in entities1 if e.type == DocumentAsEntity.__name__), None) if not doc1_entity: logging.error("Не удалось найти DocumentAsEntity для первого документа!") return doc1_id = doc1_entity.id logging.info(f"ID первого документа: {doc1_id}") logging.info(f"Получено {len(entities1)} сущностей из первого документа") all_entities.extend(entities1) # Обработка второго документа logging.info("Начало процесса деструктуризации второго документа") # Инициализируем экстрактор без документа extractor2 = EntitiesExtractor() # Настройка чанкинга (те же параметры) extractor2.configure_chunking( strategy_name="fixed_size", strategy_params={ "words_per_chunk": 50, "overlap_words": 25, "respect_sentence_boundaries": True, }, ) # Настройка извлечения таблиц extractor2.configure_tables_extraction(process_tables=True) # Выполнение деструктуризации entities2 = extractor2.extract(document2) # Находим ID документа 2 doc2_entity = next((e for e in entities2 if e.type == DocumentAsEntity.__name__), None) if not doc2_entity: logging.error("Не удалось найти DocumentAsEntity для второго документа!") return doc2_id = doc2_entity.id logging.info(f"ID второго документа: {doc2_id}") logging.info(f"Получено {len(entities2)} сущностей из второго документа") all_entities.extend(entities2) logging.info( f"Всего получено {len(all_entities)} сущностей из обоих документов" ) # Шаг 3: Сохранение результатов чанкинга в CSV save_entities_to_csv(all_entities, output_csv_path) # Шаг 4: Загрузка сущностей из CSV и выбор случайных чанков loaded_entities = load_entities_from_csv(output_csv_path) # Шаг 5: Создание InjectionBuilder с InMemoryEntityRepository # Сначала создаем репозиторий со ВСЕМИ загруженными сущностями repository = InMemoryEntityRepository(loaded_entities) builder = InjectionBuilder(repository=repository) # Фильтрация только чанков (сущностей с in_search_text) # Убедимся, что работаем с десериализованными сущностями из репозитория # (Репозиторий уже десериализует при инициализации, если нужно) all_entities_from_repo = repository.get_entities_by_ids( [e.id for e in loaded_entities] ) # Выбираем все сущности с in_search_text selectable_entities = [ e for e in all_entities_from_repo if e.in_search_text is not None ] # Выбор случайных сущностей (от 20 до 30, но не более доступных) num_entities_to_select = min(random.randint(100, 500), len(selectable_entities)) if num_entities_to_select > 0: selected_entities = random.sample( selectable_entities, num_entities_to_select ) selected_ids = [entity.id for entity in selected_entities] logging.info( f"Выбрано {len(selected_ids)} случайных ID сущностей (с in_search_text) для сборки" ) # Дополнительная статистика по документам # Используем репозиторий для получения информации о владельцах selected_entities_details = repository.get_entities_by_ids(selected_ids) # Считаем на основе owner_id doc1_entities_count = sum(1 for e in selected_entities_details if e.owner_id == doc1_id) doc2_entities_count = sum(1 for e in selected_entities_details if e.owner_id == doc2_id) other_owner_count = len(selected_entities_details) - (doc1_entities_count + doc2_entities_count) logging.info( f"Из них {doc1_entities_count} принадлежат первому документу (ID: {doc1_id}), " f"{doc2_entities_count} второму (ID: {doc2_id}) (на основе owner_id). " f"{other_owner_count} имеют другого владельца (вероятно, таблицы/строки)." ) else: logging.warning("Не найдено сущностей с in_search_text для выбора.") selected_ids = [] selected_entities = [] # Добавлено для ясности # Шаг 6: Сборка текста из выбранных ID logging.info("Начало сборки текста из выбранных ID") # Передаем ID, а не сущности, т.к. builder сам их получит из репозитория assembled_text = builder.build( selected_ids, include_tables=True ) # Включаем таблицы # Шаг 7: Сохранение результата в файл with open(output_text_path, "w", encoding="utf-8") as f: f.write(assembled_text.replace('\n', '\n\n')) logging.info(f"Результат сборки сохранен в {output_text_path}") except Exception as e: logging.error(f"Произошла ошибка: {e}", exc_info=True) if __name__ == "__main__": main()