#!/usr/bin/env python """ Скрипт для визуального тестирования процесса чанкинга и сборки документа. Этот скрипт: 1. Считывает test_input/test.docx с помощью UniversalParser 2. Чанкит документ через Destructurer с fixed_size-стратегией 3. Сохраняет результат чанкинга в test_output/test.csv 4. Выбирает 20-30 случайных чанков из CSV 5. Создает InjectionBuilder с InMemoryEntityRepository 6. Собирает текст из выбранных чанков 7. Сохраняет результат в test_output/test_builded.txt """ import logging import os import random from pathlib import Path from typing import List import pandas as pd from ntr_fileparser import UniversalParser from ntr_text_fragmentation.chunking.specific_strategies.fixed_size_chunking import \ FixedSizeChunkingStrategy from ntr_text_fragmentation.core.destructurer import Destructurer from ntr_text_fragmentation.core.entity_repository import \ InMemoryEntityRepository from ntr_text_fragmentation.core.injection_builder import InjectionBuilder from ntr_text_fragmentation.models.linker_entity import LinkerEntity def setup_logging() -> None: """Настройка логгирования.""" logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) def ensure_directories() -> None: """Проверка наличия необходимых директорий.""" for directory in ["test_input", "test_output"]: Path(directory).mkdir(parents=True, exist_ok=True) def save_entities_to_csv(entities: List[LinkerEntity], csv_path: str) -> None: """ Сохраняет сущности в CSV файл. Args: entities: Список сущностей csv_path: Путь для сохранения CSV файла """ data = [] for entity in entities: # Базовые поля для всех типов сущностей entity_dict = { "id": str(entity.id), "type": entity.type, "name": entity.name, "text": entity.text, "metadata": str(entity.metadata), "in_search_text": entity.in_search_text, "source_id": entity.source_id, "target_id": entity.target_id, "number_in_relation": entity.number_in_relation, } data.append(entity_dict) df = pd.DataFrame(data) df.to_csv(csv_path, index=False) logging.info(f"Сохранено {len(entities)} сущностей в {csv_path}") def load_entities_from_csv(csv_path: str) -> List[LinkerEntity]: """ Загружает сущности из CSV файла. Args: csv_path: Путь к CSV файлу Returns: Список сущностей """ df = pd.read_csv(csv_path) entities = [] for _, row in df.iterrows(): # Обработка метаданных metadata_str = row.get("metadata", "{}") try: metadata = ( eval(metadata_str) if metadata_str and not pd.isna(metadata_str) else {} ) except: metadata = {} # Общие поля для всех типов сущностей common_args = { "id": row["id"], "name": row["name"] if not pd.isna(row.get("name", "")) else "", "text": row["text"] if not pd.isna(row.get("text", "")) else "", "metadata": metadata, "in_search_text": row["in_search_text"], "type": row["type"], } # Добавляем поля связи, если они есть if not pd.isna(row.get("source_id", "")): common_args["source_id"] = row["source_id"] common_args["target_id"] = row["target_id"] if not pd.isna(row.get("number_in_relation", "")): common_args["number_in_relation"] = int(row["number_in_relation"]) entity = LinkerEntity(**common_args) entities.append(entity) logging.info(f"Загружено {len(entities)} сущностей из {csv_path}") return entities def main() -> None: """Основная функция скрипта.""" setup_logging() ensure_directories() # Пути к файлам input_doc_path = "test_input/test.docx" output_csv_path = "test_output/test.csv" output_text_path = "test_output/test_builded.txt" # Проверка наличия входного файла if not os.path.exists(input_doc_path): logging.error(f"Файл {input_doc_path} не найден!") return logging.info(f"Парсинг документа {input_doc_path}") try: # Шаг 1: Парсинг документа дважды, как если бы это были два разных документа parser = UniversalParser() document1 = parser.parse_by_path(input_doc_path) document2 = parser.parse_by_path(input_doc_path) # Меняем название второго документа, чтобы отличить его document2.name = document2.name + "_copy" if document2.name else "copy_doc" # Шаг 2: Чанкинг обоих документов с использованием fixed_size-стратегии all_entities = [] # Обработка первого документа destructurer1 = Destructurer( document1, strategy_name="fixed_size", words_per_chunk=50, overlap_words=25 ) logging.info("Начало процесса чанкинга первого документа") entities1 = destructurer1.destructure() # Добавляем метаданные о документе к каждой сущности for entity in entities1: if not hasattr(entity, 'metadata') or entity.metadata is None: entity.metadata = {} entity.metadata['doc_name'] = "document1" logging.info(f"Получено {len(entities1)} сущностей из первого документа") all_entities.extend(entities1) # Обработка второго документа destructurer2 = Destructurer( document2, strategy_name="fixed_size", words_per_chunk=50, overlap_words=25 ) logging.info("Начало процесса чанкинга второго документа") entities2 = destructurer2.destructure() # Добавляем метаданные о документе к каждой сущности for entity in entities2: if not hasattr(entity, 'metadata') or entity.metadata is None: entity.metadata = {} entity.metadata['doc_name'] = "document2" logging.info(f"Получено {len(entities2)} сущностей из второго документа") all_entities.extend(entities2) logging.info(f"Всего получено {len(all_entities)} сущностей из обоих документов") # Шаг 3: Сохранение результатов чанкинга в CSV save_entities_to_csv(all_entities, output_csv_path) # Шаг 4: Загрузка сущностей из CSV и выбор случайных чанков loaded_entities = load_entities_from_csv(output_csv_path) # Фильтрация только чанков chunks = [e for e in loaded_entities if e.in_search_text is not None] # Выбор случайных чанков (от 20 до 30) num_chunks_to_select = min(random.randint(20, 30), len(chunks)) selected_chunks = random.sample(chunks, num_chunks_to_select) logging.info(f"Выбрано {len(selected_chunks)} случайных чанков для сборки") # Дополнительная статистика по документам doc1_chunks = [c for c in selected_chunks if hasattr(c, 'metadata') and c.metadata.get('doc_name') == "document1"] doc2_chunks = [c for c in selected_chunks if hasattr(c, 'metadata') and c.metadata.get('doc_name') == "document2"] logging.info(f"Из них {len(doc1_chunks)} чанков из первого документа и {len(doc2_chunks)} из второго") # Шаг 5: Создание InjectionBuilder с InMemoryEntityRepository repository = InMemoryEntityRepository(loaded_entities) builder = InjectionBuilder(repository=repository) # Регистрация стратегии builder.register_strategy("fixed_size", FixedSizeChunkingStrategy) # Шаг 6: Сборка текста из выбранных чанков logging.info("Начало сборки текста из выбранных чанков") assembled_text = builder.build(selected_chunks) # Шаг 7: Сохранение результата в файл with open(output_text_path, "w", encoding="utf-8") as f: f.write(assembled_text) logging.info(f"Результат сборки сохранен в {output_text_path}") # Вывод статистики logging.info(f"Общее количество сущностей: {len(loaded_entities)}") logging.info(f"Количество чанков: {len(chunks)}") logging.info(f"Выбрано для сборки: {len(selected_chunks)}") logging.info(f"Длина собранного текста: {len(assembled_text)} символов") except Exception as e: logging.error(f"Произошла ошибка: {e}", exc_info=True) if __name__ == "__main__": main()