Spaces:
Sleeping
Sleeping
File size: 16,912 Bytes
744a170 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 |
#!/usr/bin/env python
"""
Скрипт для визуального тестирования процесса чанкинга и сборки документа.
Этот скрипт:
1. Считывает test_input/test.docx с помощью UniversalParser
2. Чанкит документ через Destructurer с fixed_size-стратегией
3. Сохраняет результат чанкинга в test_output/test.csv
4. Выбирает 20-30 случайных чанков из CSV
5. Создает InjectionBuilder с InMemoryEntityRepository
6. Собирает текст из выбранных чанков
7. Сохраняет результат в test_output/test_builded.txt
"""
import json
import logging
import os
import random
from pathlib import Path
from typing import List
from uuid import UUID
import pandas as pd
from ntr_fileparser import UniversalParser
from ntr_text_fragmentation import (DocumentAsEntity, EntitiesExtractor,
InjectionBuilder, InMemoryEntityRepository,
LinkerEntity)
def setup_logging() -> None:
"""Настройка логгирования."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - [%(pathname)s:%(lineno)d] - %(message)s",
)
def ensure_directories() -> None:
"""Проверка наличия необходимых директорий."""
for directory in ["test_input", "test_output"]:
Path(directory).mkdir(parents=True, exist_ok=True)
def save_entities_to_csv(entities: List[LinkerEntity], csv_path: str) -> None:
"""
Сохраняет сущности в CSV файл.
Args:
entities: Список сущностей
csv_path: Путь для сохранения CSV файла
"""
data = []
for entity in entities:
# Базовые поля для всех типов сущностей
entity_dict = {
"id": str(entity.id),
"type": entity.type,
"name": entity.name,
"text": entity.text,
"metadata": json.dumps(entity.metadata or {}, ensure_ascii=False),
"in_search_text": entity.in_search_text,
"source_id": str(entity.source_id) if entity.source_id else None,
"target_id": str(entity.target_id) if entity.target_id else None,
"number_in_relation": entity.number_in_relation,
"groupper": entity.groupper,
"type": entity.type,
}
# Дополнительные поля специфичные для подклассов (если они есть в __dict__)
# Это не самый надежный способ, но для скрипта визуализации может подойти
# Сериализация LinkerEntity теперь должна сама класть доп поля в metadata
# for key, value in entity.__dict__.items():
# if key not in entity_dict and not key.startswith('_'):
# entity_dict[key] = value
data.append(entity_dict)
df = pd.DataFrame(data)
# Указываем кодировку UTF-8 при записи CSV
df.to_csv(csv_path, index=False, encoding='utf-8')
logging.info(f"Сохранено {len(entities)} сущностей в {csv_path}")
def load_entities_from_csv(csv_path: str) -> List[LinkerEntity]:
"""
Загружает сущности из CSV файла.
Args:
csv_path: Путь к CSV файлу
Returns:
Список сущностей
"""
df = pd.read_csv(csv_path)
entities = []
for _, row in df.iterrows():
# Обработка метаданных
metadata_str = row.get("metadata", "{}")
try:
# Используем json.loads для парсинга JSON строки
metadata = (
json.loads(metadata_str)
if pd.notna(metadata_str) and metadata_str
else {}
)
except json.JSONDecodeError: # Ловим ошибку JSON
logging.warning(
f"Не удалось распарсить метаданные JSON: {metadata_str}. Используется пустой словарь."
)
metadata = {}
# Общие поля для всех типов сущностей
# Преобразуем ID обратно в UUID
entity_id = row['id']
if isinstance(entity_id, str):
try:
entity_id = UUID(entity_id)
except ValueError:
logging.warning(
f"Неверный формат UUID для id: {entity_id}. Пропускаем сущность."
)
continue
common_args = {
"id": entity_id,
"name": row["name"] if pd.notna(row.get("name")) else "",
"text": row["text"] if pd.notna(row.get("text")) else "",
"metadata": metadata,
"in_search_text": (
row["in_search_text"] if pd.notna(row.get('in_search_text')) else None
),
"type": (
row["type"] if pd.notna(row.get('type')) else LinkerEntity.__name__
), # Используем базовый тип, если не указан
"groupper": row["groupper"] if pd.notna(row.get("groupper")) else None,
}
# Добавляем поля связи, если они есть, преобразуя в UUID
source_id_str = row.get("source_id")
target_id_str = row.get("target_id")
if pd.notna(source_id_str):
try:
common_args["source_id"] = UUID(source_id_str)
except ValueError:
logging.warning(
f"Неверный формат UUID для source_id: {source_id_str}. Пропускаем поле."
)
if pd.notna(target_id_str):
try:
common_args["target_id"] = UUID(target_id_str)
except ValueError:
logging.warning(
f"Неверный формат UUID для target_id: {target_id_str}. Пропускаем поле."
)
if pd.notna(row.get("number_in_relation")):
try:
common_args["number_in_relation"] = int(row["number_in_relation"])
except ValueError:
logging.warning(
f"Неверный формат для number_in_relation: {row['number_in_relation']}. Пропускаем поле."
)
# Пытаемся десериализовать в конкретный тип, если он известен
entity_class = LinkerEntity._entity_classes.get(
common_args["type"], LinkerEntity
)
try:
# Создаем экземпляр, передавая только те аргументы, которые ожидает класс
# (используя LinkerEntity._deserialize_to_me как пример, но нужно убедиться,
# что он принимает все нужные поля или имеет **kwargs)
# Пока создаем базовый LinkerEntity, т.к. подклассы могут требовать специфичные поля
# которых нет в CSV или в common_args
entity = LinkerEntity(**common_args)
# Если нужно строгое восстановление типов, потребуется более сложная логика
# с проверкой полей каждого подкласса
except TypeError as e:
logging.warning(
f"Ошибка создания экземпляра {entity_class.__name__} для ID {common_args['id']}: {e}. Создан базовый LinkerEntity."
)
entity = LinkerEntity(**common_args) # Откат к базовому классу
entities.append(entity)
logging.info(f"Загружено {len(entities)} сущностей из {csv_path}")
return entities
def main() -> None:
"""Основная функция скрипта."""
setup_logging()
ensure_directories()
# Пути к файлам
input_doc_path = "test_input/test2.docx"
output_csv_path = "test_output/test2.csv"
output_text_path = "test_output/test2.md"
# Проверка наличия входного файла
if not os.path.exists(input_doc_path):
logging.error(f"Файл {input_doc_path} не найден!")
return
logging.info(f"Парсинг документа {input_doc_path}")
try:
# Шаг 1: Парсинг документа дважды, как если бы это были два разных документа
parser = UniversalParser()
document1 = parser.parse_by_path(input_doc_path)
document2 = parser.parse_by_path(input_doc_path)
# Меняем название второго документа, чтобы отличить его
document2.name = document2.name + "_copy" if document2.name else "copy_doc"
# Шаг 2: Чанкинг и извлечение таблиц с использованием EntitiesExtractor
all_entities = []
# Обработка первого документа
logging.info("Начало процесса деструктуризации первого документа")
# Инициализируем экстрактор без документа (используем дефолтные настройки или настроим позже)
extractor1 = EntitiesExtractor()
# Настройка чанкинга
extractor1.configure_chunking(
strategy_name="fixed_size",
strategy_params={
"words_per_chunk": 50,
"overlap_words": 25,
"respect_sentence_boundaries": True, # Добавлено по запросу
},
)
# Настройка извлечения таблиц
extractor1.configure_tables_extraction(process_tables=True)
# Выполнение деструктуризации
entities1 = extractor1.extract(document1)
# Находим ID документа 1
doc1_entity = next((e for e in entities1 if e.type == DocumentAsEntity.__name__), None)
if not doc1_entity:
logging.error("Не удалось найти DocumentAsEntity для первого документа!")
return
doc1_id = doc1_entity.id
logging.info(f"ID первого документа: {doc1_id}")
logging.info(f"Получено {len(entities1)} сущностей из первого документа")
all_entities.extend(entities1)
# Обработка второго документа
logging.info("Начало процесса деструктуризации второго документа")
# Инициализируем экстрактор без документа
extractor2 = EntitiesExtractor()
# Настройка чанкинга (те же параметры)
extractor2.configure_chunking(
strategy_name="fixed_size",
strategy_params={
"words_per_chunk": 50,
"overlap_words": 25,
"respect_sentence_boundaries": True,
},
)
# Настройка извлечения таблиц
extractor2.configure_tables_extraction(process_tables=True)
# Выполнение деструктуризации
entities2 = extractor2.extract(document2)
# Находим ID документа 2
doc2_entity = next((e for e in entities2 if e.type == DocumentAsEntity.__name__), None)
if not doc2_entity:
logging.error("Не удалось найти DocumentAsEntity для второго документа!")
return
doc2_id = doc2_entity.id
logging.info(f"ID второго документа: {doc2_id}")
logging.info(f"Получено {len(entities2)} сущностей из второго документа")
all_entities.extend(entities2)
logging.info(
f"Всего получено {len(all_entities)} сущностей из обоих документов"
)
# Шаг 3: Сохранение результатов чанкинга в CSV
save_entities_to_csv(all_entities, output_csv_path)
# Шаг 4: Загрузка сущностей из CSV и выбор случайных чанков
loaded_entities = load_entities_from_csv(output_csv_path)
# Шаг 5: Создание InjectionBuilder с InMemoryEntityRepository
# Сначала создаем репозиторий со ВСЕМИ загруженными сущностями
repository = InMemoryEntityRepository(loaded_entities)
builder = InjectionBuilder(repository=repository)
# Фильтрация только чанков (сущностей с in_search_text)
# Убедимся, что работаем с десериализованными сущностями из репозитория
# (Репозиторий уже десериализует при инициализации, если нужно)
all_entities_from_repo = repository.get_entities_by_ids(
[e.id for e in loaded_entities]
)
# Выбираем все сущности с in_search_text
selectable_entities = [
e for e in all_entities_from_repo if e.in_search_text is not None
]
# Выбор случайных сущностей (от 20 до 30, но не более доступных)
num_entities_to_select = min(random.randint(100, 500), len(selectable_entities))
if num_entities_to_select > 0:
selected_entities = random.sample(
selectable_entities, num_entities_to_select
)
selected_ids = [entity.id for entity in selected_entities]
logging.info(
f"Выбрано {len(selected_ids)} случайных ID сущностей (с in_search_text) для сборки"
)
# Дополнительная статистика по документам
# Используем репозиторий для получения информации о владельцах
selected_entities_details = repository.get_entities_by_ids(selected_ids)
# Считаем на основе owner_id
doc1_entities_count = sum(1 for e in selected_entities_details if e.owner_id == doc1_id)
doc2_entities_count = sum(1 for e in selected_entities_details if e.owner_id == doc2_id)
other_owner_count = len(selected_entities_details) - (doc1_entities_count + doc2_entities_count)
logging.info(
f"Из них {doc1_entities_count} принадлежат первому документу (ID: {doc1_id}), "
f"{doc2_entities_count} второму (ID: {doc2_id}) (на основе owner_id). "
f"{other_owner_count} имеют другого владельца (вероятно, таблицы/строки)."
)
else:
logging.warning("Не найдено сущностей с in_search_text для выбора.")
selected_ids = []
selected_entities = [] # Добавлено для ясности
# Шаг 6: Сборка текста из выбранных ID
logging.info("Начало сборки текста из выбранных ID")
# Передаем ID, а не сущности, т.к. builder сам их получит из репозитория
assembled_text = builder.build(
selected_ids, include_tables=True
) # Включаем таблицы
# Шаг 7: Сохранение результата в файл
with open(output_text_path, "w", encoding="utf-8") as f:
f.write(assembled_text.replace('\n', '\n\n'))
logging.info(f"Результат сборки сохранен в {output_text_path}")
except Exception as e:
logging.error(f"Произошла ошибка: {e}", exc_info=True)
if __name__ == "__main__":
main()
|