Spaces:
Sleeping
Sleeping
#!/usr/bin/env python | |
""" | |
Скрипт для визуального тестирования процесса чанкинга и сборки документа. | |
Этот скрипт: | |
1. Считывает test_input/test.docx с помощью UniversalParser | |
2. Чанкит документ через Destructurer с fixed_size-стратегией | |
3. Сохраняет результат чанкинга в test_output/test.csv | |
4. Выбирает 20-30 случайных чанков из CSV | |
5. Создает InjectionBuilder с InMemoryEntityRepository | |
6. Собирает текст из выбранных чанков | |
7. Сохраняет результат в test_output/test_builded.txt | |
""" | |
import logging | |
import os | |
import random | |
from pathlib import Path | |
from typing import List | |
import pandas as pd | |
from ntr_fileparser import UniversalParser | |
from ntr_text_fragmentation.chunking.specific_strategies.fixed_size_chunking import \ | |
FixedSizeChunkingStrategy | |
from ntr_text_fragmentation.core.destructurer import Destructurer | |
from ntr_text_fragmentation.core.entity_repository import \ | |
InMemoryEntityRepository | |
from ntr_text_fragmentation.core.injection_builder import InjectionBuilder | |
from ntr_text_fragmentation.models.linker_entity import LinkerEntity | |
def setup_logging() -> None: | |
"""Настройка логгирования.""" | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
) | |
def ensure_directories() -> None: | |
"""Проверка наличия необходимых директорий.""" | |
for directory in ["test_input", "test_output"]: | |
Path(directory).mkdir(parents=True, exist_ok=True) | |
def save_entities_to_csv(entities: List[LinkerEntity], csv_path: str) -> None: | |
""" | |
Сохраняет сущности в CSV файл. | |
Args: | |
entities: Список сущностей | |
csv_path: Путь для сохранения CSV файла | |
""" | |
data = [] | |
for entity in entities: | |
# Базовые поля для всех типов сущностей | |
entity_dict = { | |
"id": str(entity.id), | |
"type": entity.type, | |
"name": entity.name, | |
"text": entity.text, | |
"metadata": str(entity.metadata), | |
"in_search_text": entity.in_search_text, | |
"source_id": entity.source_id, | |
"target_id": entity.target_id, | |
"number_in_relation": entity.number_in_relation, | |
} | |
data.append(entity_dict) | |
df = pd.DataFrame(data) | |
df.to_csv(csv_path, index=False) | |
logging.info(f"Сохранено {len(entities)} сущностей в {csv_path}") | |
def load_entities_from_csv(csv_path: str) -> List[LinkerEntity]: | |
""" | |
Загружает сущности из CSV файла. | |
Args: | |
csv_path: Путь к CSV файлу | |
Returns: | |
Список сущностей | |
""" | |
df = pd.read_csv(csv_path) | |
entities = [] | |
for _, row in df.iterrows(): | |
# Обработка метаданных | |
metadata_str = row.get("metadata", "{}") | |
try: | |
metadata = ( | |
eval(metadata_str) if metadata_str and not pd.isna(metadata_str) else {} | |
) | |
except: | |
metadata = {} | |
# Общие поля для всех типов сущностей | |
common_args = { | |
"id": row["id"], | |
"name": row["name"] if not pd.isna(row.get("name", "")) else "", | |
"text": row["text"] if not pd.isna(row.get("text", "")) else "", | |
"metadata": metadata, | |
"in_search_text": row["in_search_text"], | |
"type": row["type"], | |
} | |
# Добавляем поля связи, если они есть | |
if not pd.isna(row.get("source_id", "")): | |
common_args["source_id"] = row["source_id"] | |
common_args["target_id"] = row["target_id"] | |
if not pd.isna(row.get("number_in_relation", "")): | |
common_args["number_in_relation"] = int(row["number_in_relation"]) | |
entity = LinkerEntity(**common_args) | |
entities.append(entity) | |
logging.info(f"Загружено {len(entities)} сущностей из {csv_path}") | |
return entities | |
def main() -> None: | |
"""Основная функция скрипта.""" | |
setup_logging() | |
ensure_directories() | |
# Пути к файлам | |
input_doc_path = "test_input/test.docx" | |
output_csv_path = "test_output/test.csv" | |
output_text_path = "test_output/test_builded.txt" | |
# Проверка наличия входного файла | |
if not os.path.exists(input_doc_path): | |
logging.error(f"Файл {input_doc_path} не найден!") | |
return | |
logging.info(f"Парсинг документа {input_doc_path}") | |
try: | |
# Шаг 1: Парсинг документа дважды, как если бы это были два разных документа | |
parser = UniversalParser() | |
document1 = parser.parse_by_path(input_doc_path) | |
document2 = parser.parse_by_path(input_doc_path) | |
# Меняем название второго документа, чтобы отличить его | |
document2.name = document2.name + "_copy" if document2.name else "copy_doc" | |
# Шаг 2: Чанкинг обоих документов с использованием fixed_size-стратегии | |
all_entities = [] | |
# Обработка первого документа | |
destructurer1 = Destructurer( | |
document1, strategy_name="fixed_size", words_per_chunk=50, overlap_words=25 | |
) | |
logging.info("Начало процесса чанкинга первого документа") | |
entities1 = destructurer1.destructure() | |
# Добавляем метаданные о документе к каждой сущности | |
for entity in entities1: | |
if not hasattr(entity, 'metadata') or entity.metadata is None: | |
entity.metadata = {} | |
entity.metadata['doc_name'] = "document1" | |
logging.info(f"Получено {len(entities1)} сущностей из первого документа") | |
all_entities.extend(entities1) | |
# Обработка второго документа | |
destructurer2 = Destructurer( | |
document2, strategy_name="fixed_size", words_per_chunk=50, overlap_words=25 | |
) | |
logging.info("Начало процесса чанкинга второго документа") | |
entities2 = destructurer2.destructure() | |
# Добавляем метаданные о документе к каждой сущности | |
for entity in entities2: | |
if not hasattr(entity, 'metadata') or entity.metadata is None: | |
entity.metadata = {} | |
entity.metadata['doc_name'] = "document2" | |
logging.info(f"Получено {len(entities2)} сущностей из второго документа") | |
all_entities.extend(entities2) | |
logging.info(f"Всего получено {len(all_entities)} сущностей из обоих документов") | |
# Шаг 3: Сохранение результатов чанкинга в CSV | |
save_entities_to_csv(all_entities, output_csv_path) | |
# Шаг 4: Загрузка сущностей из CSV и выбор случайных чанков | |
loaded_entities = load_entities_from_csv(output_csv_path) | |
# Фильтрация только чанков | |
chunks = [e for e in loaded_entities if e.in_search_text is not None] | |
# Выбор случайных чанков (от 20 до 30) | |
num_chunks_to_select = min(random.randint(20, 30), len(chunks)) | |
selected_chunks = random.sample(chunks, num_chunks_to_select) | |
logging.info(f"Выбрано {len(selected_chunks)} случайных чанков для сборки") | |
# Дополнительная статистика по документам | |
doc1_chunks = [c for c in selected_chunks if hasattr(c, 'metadata') and c.metadata.get('doc_name') == "document1"] | |
doc2_chunks = [c for c in selected_chunks if hasattr(c, 'metadata') and c.metadata.get('doc_name') == "document2"] | |
logging.info(f"Из них {len(doc1_chunks)} чанков из первого документа и {len(doc2_chunks)} из второго") | |
# Шаг 5: Создание InjectionBuilder с InMemoryEntityRepository | |
repository = InMemoryEntityRepository(loaded_entities) | |
builder = InjectionBuilder(repository=repository) | |
# Регистрация стратегии | |
builder.register_strategy("fixed_size", FixedSizeChunkingStrategy) | |
# Шаг 6: Сборка текста из выбранных чанков | |
logging.info("Начало сборки текста из выбранных чанков") | |
assembled_text = builder.build(selected_chunks) | |
# Шаг 7: Сохранение результата в файл | |
with open(output_text_path, "w", encoding="utf-8") as f: | |
f.write(assembled_text) | |
logging.info(f"Результат сборки сохранен в {output_text_path}") | |
# Вывод статистики | |
logging.info(f"Общее количество сущностей: {len(loaded_entities)}") | |
logging.info(f"Количество чанков: {len(chunks)}") | |
logging.info(f"Выбрано для сборки: {len(selected_chunks)}") | |
logging.info(f"Длина собранного текста: {len(assembled_text)} символов") | |
except Exception as e: | |
logging.error(f"Произошла ошибка: {e}", exc_info=True) | |
if __name__ == "__main__": | |
main() | |