Spaces:
Sleeping
Sleeping
File size: 10,082 Bytes
86c402d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
#!/usr/bin/env python
"""
Скрипт для визуального тестирования процесса чанкинга и сборки документа.
Этот скрипт:
1. Считывает test_input/test.docx с помощью UniversalParser
2. Чанкит документ через Destructurer с fixed_size-стратегией
3. Сохраняет результат чанкинга в test_output/test.csv
4. Выбирает 20-30 случайных чанков из CSV
5. Создает InjectionBuilder с InMemoryEntityRepository
6. Собирает текст из выбранных чанков
7. Сохраняет результат в test_output/test_builded.txt
"""
import logging
import os
import random
from pathlib import Path
from typing import List
import pandas as pd
from ntr_fileparser import UniversalParser
from ntr_text_fragmentation.chunking.specific_strategies.fixed_size_chunking import \
FixedSizeChunkingStrategy
from ntr_text_fragmentation.core.destructurer import Destructurer
from ntr_text_fragmentation.core.entity_repository import \
InMemoryEntityRepository
from ntr_text_fragmentation.core.injection_builder import InjectionBuilder
from ntr_text_fragmentation.models.linker_entity import LinkerEntity
def setup_logging() -> None:
"""Настройка логгирования."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
def ensure_directories() -> None:
"""Проверка наличия необходимых директорий."""
for directory in ["test_input", "test_output"]:
Path(directory).mkdir(parents=True, exist_ok=True)
def save_entities_to_csv(entities: List[LinkerEntity], csv_path: str) -> None:
"""
Сохраняет сущности в CSV файл.
Args:
entities: Список сущностей
csv_path: Путь для сохранения CSV файла
"""
data = []
for entity in entities:
# Базовые поля для всех типов сущностей
entity_dict = {
"id": str(entity.id),
"type": entity.type,
"name": entity.name,
"text": entity.text,
"metadata": str(entity.metadata),
"in_search_text": entity.in_search_text,
"source_id": entity.source_id,
"target_id": entity.target_id,
"number_in_relation": entity.number_in_relation,
}
data.append(entity_dict)
df = pd.DataFrame(data)
df.to_csv(csv_path, index=False)
logging.info(f"Сохранено {len(entities)} сущностей в {csv_path}")
def load_entities_from_csv(csv_path: str) -> List[LinkerEntity]:
"""
Загружает сущности из CSV файла.
Args:
csv_path: Путь к CSV файлу
Returns:
Список сущностей
"""
df = pd.read_csv(csv_path)
entities = []
for _, row in df.iterrows():
# Обработка метаданных
metadata_str = row.get("metadata", "{}")
try:
metadata = (
eval(metadata_str) if metadata_str and not pd.isna(metadata_str) else {}
)
except:
metadata = {}
# Общие поля для всех типов сущностей
common_args = {
"id": row["id"],
"name": row["name"] if not pd.isna(row.get("name", "")) else "",
"text": row["text"] if not pd.isna(row.get("text", "")) else "",
"metadata": metadata,
"in_search_text": row["in_search_text"],
"type": row["type"],
}
# Добавляем поля связи, если они есть
if not pd.isna(row.get("source_id", "")):
common_args["source_id"] = row["source_id"]
common_args["target_id"] = row["target_id"]
if not pd.isna(row.get("number_in_relation", "")):
common_args["number_in_relation"] = int(row["number_in_relation"])
entity = LinkerEntity(**common_args)
entities.append(entity)
logging.info(f"Загружено {len(entities)} сущностей из {csv_path}")
return entities
def main() -> None:
"""Основная функция скрипта."""
setup_logging()
ensure_directories()
# Пути к файлам
input_doc_path = "test_input/test.docx"
output_csv_path = "test_output/test.csv"
output_text_path = "test_output/test_builded.txt"
# Проверка наличия входного файла
if not os.path.exists(input_doc_path):
logging.error(f"Файл {input_doc_path} не найден!")
return
logging.info(f"Парсинг документа {input_doc_path}")
try:
# Шаг 1: Парсинг документа дважды, как если бы это были два разных документа
parser = UniversalParser()
document1 = parser.parse_by_path(input_doc_path)
document2 = parser.parse_by_path(input_doc_path)
# Меняем название второго документа, чтобы отличить его
document2.name = document2.name + "_copy" if document2.name else "copy_doc"
# Шаг 2: Чанкинг обоих документов с использованием fixed_size-стратегии
all_entities = []
# Обработка первого документа
destructurer1 = Destructurer(
document1, strategy_name="fixed_size", words_per_chunk=50, overlap_words=25
)
logging.info("Начало процесса чанкинга первого документа")
entities1 = destructurer1.destructure()
# Добавляем метаданные о документе к каждой сущности
for entity in entities1:
if not hasattr(entity, 'metadata') or entity.metadata is None:
entity.metadata = {}
entity.metadata['doc_name'] = "document1"
logging.info(f"Получено {len(entities1)} сущностей из первого документа")
all_entities.extend(entities1)
# Обработка второго документа
destructurer2 = Destructurer(
document2, strategy_name="fixed_size", words_per_chunk=50, overlap_words=25
)
logging.info("Начало процесса чанкинга второго документа")
entities2 = destructurer2.destructure()
# Добавляем метаданные о документе к каждой сущности
for entity in entities2:
if not hasattr(entity, 'metadata') or entity.metadata is None:
entity.metadata = {}
entity.metadata['doc_name'] = "document2"
logging.info(f"Получено {len(entities2)} сущностей из второго документа")
all_entities.extend(entities2)
logging.info(f"Всего получено {len(all_entities)} сущностей из обоих документов")
# Шаг 3: Сохранение результатов чанкинга в CSV
save_entities_to_csv(all_entities, output_csv_path)
# Шаг 4: Загрузка сущностей из CSV и выбор случайных чанков
loaded_entities = load_entities_from_csv(output_csv_path)
# Фильтрация только чанков
chunks = [e for e in loaded_entities if e.in_search_text is not None]
# Выбор случайных чанков (от 20 до 30)
num_chunks_to_select = min(random.randint(20, 30), len(chunks))
selected_chunks = random.sample(chunks, num_chunks_to_select)
logging.info(f"Выбрано {len(selected_chunks)} случайных чанков для сборки")
# Дополнительная статистика по документам
doc1_chunks = [c for c in selected_chunks if hasattr(c, 'metadata') and c.metadata.get('doc_name') == "document1"]
doc2_chunks = [c for c in selected_chunks if hasattr(c, 'metadata') and c.metadata.get('doc_name') == "document2"]
logging.info(f"Из них {len(doc1_chunks)} чанков из первого документа и {len(doc2_chunks)} из второго")
# Шаг 5: Создание InjectionBuilder с InMemoryEntityRepository
repository = InMemoryEntityRepository(loaded_entities)
builder = InjectionBuilder(repository=repository)
# Регистрация стратегии
builder.register_strategy("fixed_size", FixedSizeChunkingStrategy)
# Шаг 6: Сборка текста из выбранных чанков
logging.info("Начало сборки текста из выбранных чанков")
assembled_text = builder.build(selected_chunks)
# Шаг 7: Сохранение результата в файл
with open(output_text_path, "w", encoding="utf-8") as f:
f.write(assembled_text)
logging.info(f"Результат сборки сохранен в {output_text_path}")
# Вывод статистики
logging.info(f"Общее количество сущностей: {len(loaded_entities)}")
logging.info(f"Количество чанков: {len(chunks)}")
logging.info(f"Выбрано для сборки: {len(selected_chunks)}")
logging.info(f"Длина собранного текста: {len(assembled_text)} символов")
except Exception as e:
logging.error(f"Произошла ошибка: {e}", exc_info=True)
if __name__ == "__main__":
main()
|