generic-chatbot-backend / lib /extractor /scripts /test_chunking_visualization.py
muryshev's picture
update
86c402d
raw
history blame
10.1 kB
#!/usr/bin/env python
"""
Скрипт для визуального тестирования процесса чанкинга и сборки документа.
Этот скрипт:
1. Считывает test_input/test.docx с помощью UniversalParser
2. Чанкит документ через Destructurer с fixed_size-стратегией
3. Сохраняет результат чанкинга в test_output/test.csv
4. Выбирает 20-30 случайных чанков из CSV
5. Создает InjectionBuilder с InMemoryEntityRepository
6. Собирает текст из выбранных чанков
7. Сохраняет результат в test_output/test_builded.txt
"""
import logging
import os
import random
from pathlib import Path
from typing import List
import pandas as pd
from ntr_fileparser import UniversalParser
from ntr_text_fragmentation.chunking.specific_strategies.fixed_size_chunking import \
FixedSizeChunkingStrategy
from ntr_text_fragmentation.core.destructurer import Destructurer
from ntr_text_fragmentation.core.entity_repository import \
InMemoryEntityRepository
from ntr_text_fragmentation.core.injection_builder import InjectionBuilder
from ntr_text_fragmentation.models.linker_entity import LinkerEntity
def setup_logging() -> None:
"""Настройка логгирования."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
def ensure_directories() -> None:
"""Проверка наличия необходимых директорий."""
for directory in ["test_input", "test_output"]:
Path(directory).mkdir(parents=True, exist_ok=True)
def save_entities_to_csv(entities: List[LinkerEntity], csv_path: str) -> None:
"""
Сохраняет сущности в CSV файл.
Args:
entities: Список сущностей
csv_path: Путь для сохранения CSV файла
"""
data = []
for entity in entities:
# Базовые поля для всех типов сущностей
entity_dict = {
"id": str(entity.id),
"type": entity.type,
"name": entity.name,
"text": entity.text,
"metadata": str(entity.metadata),
"in_search_text": entity.in_search_text,
"source_id": entity.source_id,
"target_id": entity.target_id,
"number_in_relation": entity.number_in_relation,
}
data.append(entity_dict)
df = pd.DataFrame(data)
df.to_csv(csv_path, index=False)
logging.info(f"Сохранено {len(entities)} сущностей в {csv_path}")
def load_entities_from_csv(csv_path: str) -> List[LinkerEntity]:
"""
Загружает сущности из CSV файла.
Args:
csv_path: Путь к CSV файлу
Returns:
Список сущностей
"""
df = pd.read_csv(csv_path)
entities = []
for _, row in df.iterrows():
# Обработка метаданных
metadata_str = row.get("metadata", "{}")
try:
metadata = (
eval(metadata_str) if metadata_str and not pd.isna(metadata_str) else {}
)
except:
metadata = {}
# Общие поля для всех типов сущностей
common_args = {
"id": row["id"],
"name": row["name"] if not pd.isna(row.get("name", "")) else "",
"text": row["text"] if not pd.isna(row.get("text", "")) else "",
"metadata": metadata,
"in_search_text": row["in_search_text"],
"type": row["type"],
}
# Добавляем поля связи, если они есть
if not pd.isna(row.get("source_id", "")):
common_args["source_id"] = row["source_id"]
common_args["target_id"] = row["target_id"]
if not pd.isna(row.get("number_in_relation", "")):
common_args["number_in_relation"] = int(row["number_in_relation"])
entity = LinkerEntity(**common_args)
entities.append(entity)
logging.info(f"Загружено {len(entities)} сущностей из {csv_path}")
return entities
def main() -> None:
"""Основная функция скрипта."""
setup_logging()
ensure_directories()
# Пути к файлам
input_doc_path = "test_input/test.docx"
output_csv_path = "test_output/test.csv"
output_text_path = "test_output/test_builded.txt"
# Проверка наличия входного файла
if not os.path.exists(input_doc_path):
logging.error(f"Файл {input_doc_path} не найден!")
return
logging.info(f"Парсинг документа {input_doc_path}")
try:
# Шаг 1: Парсинг документа дважды, как если бы это были два разных документа
parser = UniversalParser()
document1 = parser.parse_by_path(input_doc_path)
document2 = parser.parse_by_path(input_doc_path)
# Меняем название второго документа, чтобы отличить его
document2.name = document2.name + "_copy" if document2.name else "copy_doc"
# Шаг 2: Чанкинг обоих документов с использованием fixed_size-стратегии
all_entities = []
# Обработка первого документа
destructurer1 = Destructurer(
document1, strategy_name="fixed_size", words_per_chunk=50, overlap_words=25
)
logging.info("Начало процесса чанкинга первого документа")
entities1 = destructurer1.destructure()
# Добавляем метаданные о документе к каждой сущности
for entity in entities1:
if not hasattr(entity, 'metadata') or entity.metadata is None:
entity.metadata = {}
entity.metadata['doc_name'] = "document1"
logging.info(f"Получено {len(entities1)} сущностей из первого документа")
all_entities.extend(entities1)
# Обработка второго документа
destructurer2 = Destructurer(
document2, strategy_name="fixed_size", words_per_chunk=50, overlap_words=25
)
logging.info("Начало процесса чанкинга второго документа")
entities2 = destructurer2.destructure()
# Добавляем метаданные о документе к каждой сущности
for entity in entities2:
if not hasattr(entity, 'metadata') or entity.metadata is None:
entity.metadata = {}
entity.metadata['doc_name'] = "document2"
logging.info(f"Получено {len(entities2)} сущностей из второго документа")
all_entities.extend(entities2)
logging.info(f"Всего получено {len(all_entities)} сущностей из обоих документов")
# Шаг 3: Сохранение результатов чанкинга в CSV
save_entities_to_csv(all_entities, output_csv_path)
# Шаг 4: Загрузка сущностей из CSV и выбор случайных чанков
loaded_entities = load_entities_from_csv(output_csv_path)
# Фильтрация только чанков
chunks = [e for e in loaded_entities if e.in_search_text is not None]
# Выбор случайных чанков (от 20 до 30)
num_chunks_to_select = min(random.randint(20, 30), len(chunks))
selected_chunks = random.sample(chunks, num_chunks_to_select)
logging.info(f"Выбрано {len(selected_chunks)} случайных чанков для сборки")
# Дополнительная статистика по документам
doc1_chunks = [c for c in selected_chunks if hasattr(c, 'metadata') and c.metadata.get('doc_name') == "document1"]
doc2_chunks = [c for c in selected_chunks if hasattr(c, 'metadata') and c.metadata.get('doc_name') == "document2"]
logging.info(f"Из них {len(doc1_chunks)} чанков из первого документа и {len(doc2_chunks)} из второго")
# Шаг 5: Создание InjectionBuilder с InMemoryEntityRepository
repository = InMemoryEntityRepository(loaded_entities)
builder = InjectionBuilder(repository=repository)
# Регистрация стратегии
builder.register_strategy("fixed_size", FixedSizeChunkingStrategy)
# Шаг 6: Сборка текста из выбранных чанков
logging.info("Начало сборки текста из выбранных чанков")
assembled_text = builder.build(selected_chunks)
# Шаг 7: Сохранение результата в файл
with open(output_text_path, "w", encoding="utf-8") as f:
f.write(assembled_text)
logging.info(f"Результат сборки сохранен в {output_text_path}")
# Вывод статистики
logging.info(f"Общее количество сущностей: {len(loaded_entities)}")
logging.info(f"Количество чанков: {len(chunks)}")
logging.info(f"Выбрано для сборки: {len(selected_chunks)}")
logging.info(f"Длина собранного текста: {len(assembled_text)} символов")
except Exception as e:
logging.error(f"Произошла ошибка: {e}", exc_info=True)
if __name__ == "__main__":
main()