Spaces:
Sleeping
Sleeping
#!/usr/bin/env python | |
""" | |
Скрипт для визуального тестирования процесса чанкинга и сборки документа. | |
Этот скрипт: | |
1. Считывает test_input/test.docx с помощью UniversalParser | |
2. Чанкит документ через Destructurer с fixed_size-стратегией | |
3. Сохраняет результат чанкинга в test_output/test.csv | |
4. Выбирает 20-30 случайных чанков из CSV | |
5. Создает InjectionBuilder с InMemoryEntityRepository | |
6. Собирает текст из выбранных чанков | |
7. Сохраняет результат в test_output/test_builded.txt | |
""" | |
import json | |
import logging | |
import os | |
import random | |
from pathlib import Path | |
from typing import List | |
from uuid import UUID | |
import pandas as pd | |
from ntr_fileparser import UniversalParser | |
from ntr_text_fragmentation import (DocumentAsEntity, EntitiesExtractor, | |
InjectionBuilder, InMemoryEntityRepository, | |
LinkerEntity) | |
def setup_logging() -> None: | |
"""Настройка логгирования.""" | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(levelname)s - [%(pathname)s:%(lineno)d] - %(message)s", | |
) | |
def ensure_directories() -> None: | |
"""Проверка наличия необходимых директорий.""" | |
for directory in ["test_input", "test_output"]: | |
Path(directory).mkdir(parents=True, exist_ok=True) | |
def save_entities_to_csv(entities: List[LinkerEntity], csv_path: str) -> None: | |
""" | |
Сохраняет сущности в CSV файл. | |
Args: | |
entities: Список сущностей | |
csv_path: Путь для сохранения CSV файла | |
""" | |
data = [] | |
for entity in entities: | |
# Базовые поля для всех типов сущностей | |
entity_dict = { | |
"id": str(entity.id), | |
"type": entity.type, | |
"name": entity.name, | |
"text": entity.text, | |
"metadata": json.dumps(entity.metadata or {}, ensure_ascii=False), | |
"in_search_text": entity.in_search_text, | |
"source_id": str(entity.source_id) if entity.source_id else None, | |
"target_id": str(entity.target_id) if entity.target_id else None, | |
"number_in_relation": entity.number_in_relation, | |
"groupper": entity.groupper, | |
"type": entity.type, | |
} | |
# Дополнительные поля специфичные для подклассов (если они есть в __dict__) | |
# Это не самый надежный способ, но для скрипта визуализации может подойти | |
# Сериализация LinkerEntity теперь должна сама класть доп поля в metadata | |
# for key, value in entity.__dict__.items(): | |
# if key not in entity_dict and not key.startswith('_'): | |
# entity_dict[key] = value | |
data.append(entity_dict) | |
df = pd.DataFrame(data) | |
# Указываем кодировку UTF-8 при записи CSV | |
df.to_csv(csv_path, index=False, encoding='utf-8') | |
logging.info(f"Сохранено {len(entities)} сущностей в {csv_path}") | |
def load_entities_from_csv(csv_path: str) -> List[LinkerEntity]: | |
""" | |
Загружает сущности из CSV файла. | |
Args: | |
csv_path: Путь к CSV файлу | |
Returns: | |
Список сущностей | |
""" | |
df = pd.read_csv(csv_path) | |
entities = [] | |
for _, row in df.iterrows(): | |
# Обработка метаданных | |
metadata_str = row.get("metadata", "{}") | |
try: | |
# Используем json.loads для парсинга JSON строки | |
metadata = ( | |
json.loads(metadata_str) | |
if pd.notna(metadata_str) and metadata_str | |
else {} | |
) | |
except json.JSONDecodeError: # Ловим ошибку JSON | |
logging.warning( | |
f"Не удалось распарсить метаданные JSON: {metadata_str}. Используется пустой словарь." | |
) | |
metadata = {} | |
# Общие поля для всех типов сущностей | |
# Преобразуем ID обратно в UUID | |
entity_id = row['id'] | |
if isinstance(entity_id, str): | |
try: | |
entity_id = UUID(entity_id) | |
except ValueError: | |
logging.warning( | |
f"Неверный формат UUID для id: {entity_id}. Пропускаем сущность." | |
) | |
continue | |
common_args = { | |
"id": entity_id, | |
"name": row["name"] if pd.notna(row.get("name")) else "", | |
"text": row["text"] if pd.notna(row.get("text")) else "", | |
"metadata": metadata, | |
"in_search_text": ( | |
row["in_search_text"] if pd.notna(row.get('in_search_text')) else None | |
), | |
"type": ( | |
row["type"] if pd.notna(row.get('type')) else LinkerEntity.__name__ | |
), # Используем базовый тип, если не указан | |
"groupper": row["groupper"] if pd.notna(row.get("groupper")) else None, | |
} | |
# Добавляем поля связи, если они есть, преобразуя в UUID | |
source_id_str = row.get("source_id") | |
target_id_str = row.get("target_id") | |
if pd.notna(source_id_str): | |
try: | |
common_args["source_id"] = UUID(source_id_str) | |
except ValueError: | |
logging.warning( | |
f"Неверный формат UUID для source_id: {source_id_str}. Пропускаем поле." | |
) | |
if pd.notna(target_id_str): | |
try: | |
common_args["target_id"] = UUID(target_id_str) | |
except ValueError: | |
logging.warning( | |
f"Неверный формат UUID для target_id: {target_id_str}. Пропускаем поле." | |
) | |
if pd.notna(row.get("number_in_relation")): | |
try: | |
common_args["number_in_relation"] = int(row["number_in_relation"]) | |
except ValueError: | |
logging.warning( | |
f"Неверный формат для number_in_relation: {row['number_in_relation']}. Пропускаем поле." | |
) | |
# Пытаемся десериализовать в конкретный тип, если он известен | |
entity_class = LinkerEntity._entity_classes.get( | |
common_args["type"], LinkerEntity | |
) | |
try: | |
# Создаем экземпляр, передавая только те аргументы, которые ожидает класс | |
# (используя LinkerEntity._deserialize_to_me как пример, но нужно убедиться, | |
# что он принимает все нужные поля или имеет **kwargs) | |
# Пока создаем базовый LinkerEntity, т.к. подклассы могут требовать специфичные поля | |
# которых нет в CSV или в common_args | |
entity = LinkerEntity(**common_args) | |
# Если нужно строгое восстановление типов, потребуется более сложная логика | |
# с проверкой полей каждого подкласса | |
except TypeError as e: | |
logging.warning( | |
f"Ошибка создания экземпляра {entity_class.__name__} для ID {common_args['id']}: {e}. Создан базовый LinkerEntity." | |
) | |
entity = LinkerEntity(**common_args) # Откат к базовому классу | |
entities.append(entity) | |
logging.info(f"Загружено {len(entities)} сущностей из {csv_path}") | |
return entities | |
def main() -> None: | |
"""Основная функция скрипта.""" | |
setup_logging() | |
ensure_directories() | |
# Пути к файлам | |
input_doc_path = "test_input/test2.docx" | |
output_csv_path = "test_output/test2.csv" | |
output_text_path = "test_output/test2.md" | |
# Проверка наличия входного файла | |
if not os.path.exists(input_doc_path): | |
logging.error(f"Файл {input_doc_path} не найден!") | |
return | |
logging.info(f"Парсинг документа {input_doc_path}") | |
try: | |
# Шаг 1: Парсинг документа дважды, как если бы это были два разных документа | |
parser = UniversalParser() | |
document1 = parser.parse_by_path(input_doc_path) | |
document2 = parser.parse_by_path(input_doc_path) | |
# Меняем название второго документа, чтобы отличить его | |
document2.name = document2.name + "_copy" if document2.name else "copy_doc" | |
# Шаг 2: Чанкинг и извлечение таблиц с использованием EntitiesExtractor | |
all_entities = [] | |
# Обработка первого документа | |
logging.info("Начало процесса деструктуризации первого документа") | |
# Инициализируем экстрактор без документа (используем дефолтные настройки или настроим позже) | |
extractor1 = EntitiesExtractor() | |
# Настройка чанкинга | |
extractor1.configure_chunking( | |
strategy_name="fixed_size", | |
strategy_params={ | |
"words_per_chunk": 50, | |
"overlap_words": 25, | |
"respect_sentence_boundaries": True, # Добавлено по запросу | |
}, | |
) | |
# Настройка извлечения таблиц | |
extractor1.configure_tables_extraction(process_tables=True) | |
# Выполнение деструктуризации | |
entities1 = extractor1.extract(document1) | |
# Находим ID документа 1 | |
doc1_entity = next((e for e in entities1 if e.type == DocumentAsEntity.__name__), None) | |
if not doc1_entity: | |
logging.error("Не удалось найти DocumentAsEntity для первого документа!") | |
return | |
doc1_id = doc1_entity.id | |
logging.info(f"ID первого документа: {doc1_id}") | |
logging.info(f"Получено {len(entities1)} сущностей из первого документа") | |
all_entities.extend(entities1) | |
# Обработка второго документа | |
logging.info("Начало процесса деструктуризации второго документа") | |
# Инициализируем экстрактор без документа | |
extractor2 = EntitiesExtractor() | |
# Настройка чанкинга (те же параметры) | |
extractor2.configure_chunking( | |
strategy_name="fixed_size", | |
strategy_params={ | |
"words_per_chunk": 50, | |
"overlap_words": 25, | |
"respect_sentence_boundaries": True, | |
}, | |
) | |
# Настройка извлечения таблиц | |
extractor2.configure_tables_extraction(process_tables=True) | |
# Выполнение деструктуризации | |
entities2 = extractor2.extract(document2) | |
# Находим ID документа 2 | |
doc2_entity = next((e for e in entities2 if e.type == DocumentAsEntity.__name__), None) | |
if not doc2_entity: | |
logging.error("Не удалось найти DocumentAsEntity для второго документа!") | |
return | |
doc2_id = doc2_entity.id | |
logging.info(f"ID второго документа: {doc2_id}") | |
logging.info(f"Получено {len(entities2)} сущностей из второго документа") | |
all_entities.extend(entities2) | |
logging.info( | |
f"Всего получено {len(all_entities)} сущностей из обоих документов" | |
) | |
# Шаг 3: Сохранение результатов чанкинга в CSV | |
save_entities_to_csv(all_entities, output_csv_path) | |
# Шаг 4: Загрузка сущностей из CSV и выбор случайных чанков | |
loaded_entities = load_entities_from_csv(output_csv_path) | |
# Шаг 5: Создание InjectionBuilder с InMemoryEntityRepository | |
# Сначала создаем репозиторий со ВСЕМИ загруженными сущностями | |
repository = InMemoryEntityRepository(loaded_entities) | |
builder = InjectionBuilder(repository=repository) | |
# Фильтрация только чанков (сущностей с in_search_text) | |
# Убедимся, что работаем с десериализованными сущностями из репозитория | |
# (Репозиторий уже десериализует при инициализации, если нужно) | |
all_entities_from_repo = repository.get_entities_by_ids( | |
[e.id for e in loaded_entities] | |
) | |
# Выбираем все сущности с in_search_text | |
selectable_entities = [ | |
e for e in all_entities_from_repo if e.in_search_text is not None | |
] | |
# Выбор случайных сущностей (от 20 до 30, но не более доступных) | |
num_entities_to_select = min(random.randint(100, 500), len(selectable_entities)) | |
if num_entities_to_select > 0: | |
selected_entities = random.sample( | |
selectable_entities, num_entities_to_select | |
) | |
selected_ids = [entity.id for entity in selected_entities] | |
logging.info( | |
f"Выбрано {len(selected_ids)} случайных ID сущностей (с in_search_text) для сборки" | |
) | |
# Дополнительная статистика по документам | |
# Используем репозиторий для получения информации о владельцах | |
selected_entities_details = repository.get_entities_by_ids(selected_ids) | |
# Считаем на основе owner_id | |
doc1_entities_count = sum(1 for e in selected_entities_details if e.owner_id == doc1_id) | |
doc2_entities_count = sum(1 for e in selected_entities_details if e.owner_id == doc2_id) | |
other_owner_count = len(selected_entities_details) - (doc1_entities_count + doc2_entities_count) | |
logging.info( | |
f"Из них {doc1_entities_count} принадлежат первому документу (ID: {doc1_id}), " | |
f"{doc2_entities_count} второму (ID: {doc2_id}) (на основе owner_id). " | |
f"{other_owner_count} имеют другого владельца (вероятно, таблицы/строки)." | |
) | |
else: | |
logging.warning("Не найдено сущностей с in_search_text для выбора.") | |
selected_ids = [] | |
selected_entities = [] # Добавлено для ясности | |
# Шаг 6: Сборка текста из выбранных ID | |
logging.info("Начало сборки текста из выбранных ID") | |
# Передаем ID, а не сущности, т.к. builder сам их получит из репозитория | |
assembled_text = builder.build( | |
selected_ids, include_tables=True | |
) # Включаем таблицы | |
# Шаг 7: Сохранение результата в файл | |
with open(output_text_path, "w", encoding="utf-8") as f: | |
f.write(assembled_text.replace('\n', '\n\n')) | |
logging.info(f"Результат сборки сохранен в {output_text_path}") | |
except Exception as e: | |
logging.error(f"Произошла ошибка: {e}", exc_info=True) | |
if __name__ == "__main__": | |
main() | |