muryshev's picture
update
744a170
raw
history blame
16.9 kB
#!/usr/bin/env python
"""
Скрипт для визуального тестирования процесса чанкинга и сборки документа.
Этот скрипт:
1. Считывает test_input/test.docx с помощью UniversalParser
2. Чанкит документ через Destructurer с fixed_size-стратегией
3. Сохраняет результат чанкинга в test_output/test.csv
4. Выбирает 20-30 случайных чанков из CSV
5. Создает InjectionBuilder с InMemoryEntityRepository
6. Собирает текст из выбранных чанков
7. Сохраняет результат в test_output/test_builded.txt
"""
import json
import logging
import os
import random
from pathlib import Path
from typing import List
from uuid import UUID
import pandas as pd
from ntr_fileparser import UniversalParser
from ntr_text_fragmentation import (DocumentAsEntity, EntitiesExtractor,
InjectionBuilder, InMemoryEntityRepository,
LinkerEntity)
def setup_logging() -> None:
"""Настройка логгирования."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - [%(pathname)s:%(lineno)d] - %(message)s",
)
def ensure_directories() -> None:
"""Проверка наличия необходимых директорий."""
for directory in ["test_input", "test_output"]:
Path(directory).mkdir(parents=True, exist_ok=True)
def save_entities_to_csv(entities: List[LinkerEntity], csv_path: str) -> None:
"""
Сохраняет сущности в CSV файл.
Args:
entities: Список сущностей
csv_path: Путь для сохранения CSV файла
"""
data = []
for entity in entities:
# Базовые поля для всех типов сущностей
entity_dict = {
"id": str(entity.id),
"type": entity.type,
"name": entity.name,
"text": entity.text,
"metadata": json.dumps(entity.metadata or {}, ensure_ascii=False),
"in_search_text": entity.in_search_text,
"source_id": str(entity.source_id) if entity.source_id else None,
"target_id": str(entity.target_id) if entity.target_id else None,
"number_in_relation": entity.number_in_relation,
"groupper": entity.groupper,
"type": entity.type,
}
# Дополнительные поля специфичные для подклассов (если они есть в __dict__)
# Это не самый надежный способ, но для скрипта визуализации может подойти
# Сериализация LinkerEntity теперь должна сама класть доп поля в metadata
# for key, value in entity.__dict__.items():
# if key not in entity_dict and not key.startswith('_'):
# entity_dict[key] = value
data.append(entity_dict)
df = pd.DataFrame(data)
# Указываем кодировку UTF-8 при записи CSV
df.to_csv(csv_path, index=False, encoding='utf-8')
logging.info(f"Сохранено {len(entities)} сущностей в {csv_path}")
def load_entities_from_csv(csv_path: str) -> List[LinkerEntity]:
"""
Загружает сущности из CSV файла.
Args:
csv_path: Путь к CSV файлу
Returns:
Список сущностей
"""
df = pd.read_csv(csv_path)
entities = []
for _, row in df.iterrows():
# Обработка метаданных
metadata_str = row.get("metadata", "{}")
try:
# Используем json.loads для парсинга JSON строки
metadata = (
json.loads(metadata_str)
if pd.notna(metadata_str) and metadata_str
else {}
)
except json.JSONDecodeError: # Ловим ошибку JSON
logging.warning(
f"Не удалось распарсить метаданные JSON: {metadata_str}. Используется пустой словарь."
)
metadata = {}
# Общие поля для всех типов сущностей
# Преобразуем ID обратно в UUID
entity_id = row['id']
if isinstance(entity_id, str):
try:
entity_id = UUID(entity_id)
except ValueError:
logging.warning(
f"Неверный формат UUID для id: {entity_id}. Пропускаем сущность."
)
continue
common_args = {
"id": entity_id,
"name": row["name"] if pd.notna(row.get("name")) else "",
"text": row["text"] if pd.notna(row.get("text")) else "",
"metadata": metadata,
"in_search_text": (
row["in_search_text"] if pd.notna(row.get('in_search_text')) else None
),
"type": (
row["type"] if pd.notna(row.get('type')) else LinkerEntity.__name__
), # Используем базовый тип, если не указан
"groupper": row["groupper"] if pd.notna(row.get("groupper")) else None,
}
# Добавляем поля связи, если они есть, преобразуя в UUID
source_id_str = row.get("source_id")
target_id_str = row.get("target_id")
if pd.notna(source_id_str):
try:
common_args["source_id"] = UUID(source_id_str)
except ValueError:
logging.warning(
f"Неверный формат UUID для source_id: {source_id_str}. Пропускаем поле."
)
if pd.notna(target_id_str):
try:
common_args["target_id"] = UUID(target_id_str)
except ValueError:
logging.warning(
f"Неверный формат UUID для target_id: {target_id_str}. Пропускаем поле."
)
if pd.notna(row.get("number_in_relation")):
try:
common_args["number_in_relation"] = int(row["number_in_relation"])
except ValueError:
logging.warning(
f"Неверный формат для number_in_relation: {row['number_in_relation']}. Пропускаем поле."
)
# Пытаемся десериализовать в конкретный тип, если он известен
entity_class = LinkerEntity._entity_classes.get(
common_args["type"], LinkerEntity
)
try:
# Создаем экземпляр, передавая только те аргументы, которые ожидает класс
# (используя LinkerEntity._deserialize_to_me как пример, но нужно убедиться,
# что он принимает все нужные поля или имеет **kwargs)
# Пока создаем базовый LinkerEntity, т.к. подклассы могут требовать специфичные поля
# которых нет в CSV или в common_args
entity = LinkerEntity(**common_args)
# Если нужно строгое восстановление типов, потребуется более сложная логика
# с проверкой полей каждого подкласса
except TypeError as e:
logging.warning(
f"Ошибка создания экземпляра {entity_class.__name__} для ID {common_args['id']}: {e}. Создан базовый LinkerEntity."
)
entity = LinkerEntity(**common_args) # Откат к базовому классу
entities.append(entity)
logging.info(f"Загружено {len(entities)} сущностей из {csv_path}")
return entities
def main() -> None:
"""Основная функция скрипта."""
setup_logging()
ensure_directories()
# Пути к файлам
input_doc_path = "test_input/test2.docx"
output_csv_path = "test_output/test2.csv"
output_text_path = "test_output/test2.md"
# Проверка наличия входного файла
if not os.path.exists(input_doc_path):
logging.error(f"Файл {input_doc_path} не найден!")
return
logging.info(f"Парсинг документа {input_doc_path}")
try:
# Шаг 1: Парсинг документа дважды, как если бы это были два разных документа
parser = UniversalParser()
document1 = parser.parse_by_path(input_doc_path)
document2 = parser.parse_by_path(input_doc_path)
# Меняем название второго документа, чтобы отличить его
document2.name = document2.name + "_copy" if document2.name else "copy_doc"
# Шаг 2: Чанкинг и извлечение таблиц с использованием EntitiesExtractor
all_entities = []
# Обработка первого документа
logging.info("Начало процесса деструктуризации первого документа")
# Инициализируем экстрактор без документа (используем дефолтные настройки или настроим позже)
extractor1 = EntitiesExtractor()
# Настройка чанкинга
extractor1.configure_chunking(
strategy_name="fixed_size",
strategy_params={
"words_per_chunk": 50,
"overlap_words": 25,
"respect_sentence_boundaries": True, # Добавлено по запросу
},
)
# Настройка извлечения таблиц
extractor1.configure_tables_extraction(process_tables=True)
# Выполнение деструктуризации
entities1 = extractor1.extract(document1)
# Находим ID документа 1
doc1_entity = next((e for e in entities1 if e.type == DocumentAsEntity.__name__), None)
if not doc1_entity:
logging.error("Не удалось найти DocumentAsEntity для первого документа!")
return
doc1_id = doc1_entity.id
logging.info(f"ID первого документа: {doc1_id}")
logging.info(f"Получено {len(entities1)} сущностей из первого документа")
all_entities.extend(entities1)
# Обработка второго документа
logging.info("Начало процесса деструктуризации второго документа")
# Инициализируем экстрактор без документа
extractor2 = EntitiesExtractor()
# Настройка чанкинга (те же параметры)
extractor2.configure_chunking(
strategy_name="fixed_size",
strategy_params={
"words_per_chunk": 50,
"overlap_words": 25,
"respect_sentence_boundaries": True,
},
)
# Настройка извлечения таблиц
extractor2.configure_tables_extraction(process_tables=True)
# Выполнение деструктуризации
entities2 = extractor2.extract(document2)
# Находим ID документа 2
doc2_entity = next((e for e in entities2 if e.type == DocumentAsEntity.__name__), None)
if not doc2_entity:
logging.error("Не удалось найти DocumentAsEntity для второго документа!")
return
doc2_id = doc2_entity.id
logging.info(f"ID второго документа: {doc2_id}")
logging.info(f"Получено {len(entities2)} сущностей из второго документа")
all_entities.extend(entities2)
logging.info(
f"Всего получено {len(all_entities)} сущностей из обоих документов"
)
# Шаг 3: Сохранение результатов чанкинга в CSV
save_entities_to_csv(all_entities, output_csv_path)
# Шаг 4: Загрузка сущностей из CSV и выбор случайных чанков
loaded_entities = load_entities_from_csv(output_csv_path)
# Шаг 5: Создание InjectionBuilder с InMemoryEntityRepository
# Сначала создаем репозиторий со ВСЕМИ загруженными сущностями
repository = InMemoryEntityRepository(loaded_entities)
builder = InjectionBuilder(repository=repository)
# Фильтрация только чанков (сущностей с in_search_text)
# Убедимся, что работаем с десериализованными сущностями из репозитория
# (Репозиторий уже десериализует при инициализации, если нужно)
all_entities_from_repo = repository.get_entities_by_ids(
[e.id for e in loaded_entities]
)
# Выбираем все сущности с in_search_text
selectable_entities = [
e for e in all_entities_from_repo if e.in_search_text is not None
]
# Выбор случайных сущностей (от 20 до 30, но не более доступных)
num_entities_to_select = min(random.randint(100, 500), len(selectable_entities))
if num_entities_to_select > 0:
selected_entities = random.sample(
selectable_entities, num_entities_to_select
)
selected_ids = [entity.id for entity in selected_entities]
logging.info(
f"Выбрано {len(selected_ids)} случайных ID сущностей (с in_search_text) для сборки"
)
# Дополнительная статистика по документам
# Используем репозиторий для получения информации о владельцах
selected_entities_details = repository.get_entities_by_ids(selected_ids)
# Считаем на основе owner_id
doc1_entities_count = sum(1 for e in selected_entities_details if e.owner_id == doc1_id)
doc2_entities_count = sum(1 for e in selected_entities_details if e.owner_id == doc2_id)
other_owner_count = len(selected_entities_details) - (doc1_entities_count + doc2_entities_count)
logging.info(
f"Из них {doc1_entities_count} принадлежат первому документу (ID: {doc1_id}), "
f"{doc2_entities_count} второму (ID: {doc2_id}) (на основе owner_id). "
f"{other_owner_count} имеют другого владельца (вероятно, таблицы/строки)."
)
else:
logging.warning("Не найдено сущностей с in_search_text для выбора.")
selected_ids = []
selected_entities = [] # Добавлено для ясности
# Шаг 6: Сборка текста из выбранных ID
logging.info("Начало сборки текста из выбранных ID")
# Передаем ID, а не сущности, т.к. builder сам их получит из репозитория
assembled_text = builder.build(
selected_ids, include_tables=True
) # Включаем таблицы
# Шаг 7: Сохранение результата в файл
with open(output_text_path, "w", encoding="utf-8") as f:
f.write(assembled_text.replace('\n', '\n\n'))
logging.info(f"Результат сборки сохранен в {output_text_path}")
except Exception as e:
logging.error(f"Произошла ошибка: {e}", exc_info=True)
if __name__ == "__main__":
main()