#!/usr/bin/env python """ Скрипт для подготовки датасета с вопросами и текстами пунктов/приложений. Преобразует исходный датасет, содержащий списки пунктов, в расширенный датасет, где каждому пункту/приложению соответствует отдельная строка. """ import argparse import sys from pathlib import Path from typing import Any, Dict import pandas as pd from tqdm import tqdm from ntr_text_fragmentation import Destructurer sys.path.insert(0, str(Path(__file__).parent.parent)) from ntr_fileparser import UniversalParser def parse_args(): """ Парсит аргументы командной строки. Returns: Аргументы командной строки """ parser = argparse.ArgumentParser(description="Подготовка датасета с текстами пунктов") parser.add_argument('--input-dataset', type=str, default='data/dataset.xlsx', help='Путь к исходному датасету (Excel-файл)') parser.add_argument('--output-dataset', type=str, default='data/dataset_with_texts.xlsx', help='Путь для сохранения подготовленного датасета (Excel-файл)') parser.add_argument('--data-folder', type=str, default='data/docs', help='Путь к папке с документами') parser.add_argument('--debug', action='store_true', help='Включить режим отладки с дополнительным выводом информации') return parser.parse_args() def load_dataset(file_path: str, debug: bool = False) -> pd.DataFrame: """ Загружает исходный датасет с вопросами. Args: file_path: Путь к Excel-файлу debug: Режим отладки Returns: DataFrame с вопросами """ print(f"Загрузка исходного датасета из {file_path}...") df = pd.read_excel(file_path) # Преобразуем строковые списки в настоящие списки for col in ['puncts', 'appendices']: if col in df.columns: df[col] = df[col].apply(lambda x: eval(x) if isinstance(x, str) and x.strip() else ([] if pd.isna(x) else x)) # Вывод отладочной информации о форматах пунктов/приложений if debug: all_puncts = set() all_appendices = set() for _, row in df.iterrows(): if 'puncts' in row and row['puncts']: all_puncts.update(row['puncts']) if 'appendices' in row and row['appendices']: all_appendices.update(row['appendices']) print(f"\nУникальные форматы пунктов в датасете ({len(all_puncts)}):") for i, p in enumerate(sorted(all_puncts)): if i < 20 or i > len(all_puncts) - 20: print(f" - {repr(p)}") elif i == 20: print(" ... (пропущено)") print(f"\nУникальные форматы приложений в датасете ({len(all_appendices)}):") for app in sorted(all_appendices): print(f" - {repr(app)}") print(f"Загружено {len(df)} вопросов") return df def read_documents(folder_path: str) -> Dict[str, Any]: """ Читает все документы из указанной папки. Args: folder_path: Путь к папке с документами Returns: Словарь {имя_файла: parsed_document} """ print(f"Чтение документов из {folder_path}...") parser = UniversalParser() documents = {} for file_path in tqdm(list(Path(folder_path).glob("*.docx")), desc="Чтение документов"): try: doc_name = file_path.stem documents[doc_name] = parser.parse_by_path(str(file_path)) except Exception as e: print(f"Ошибка при чтении файла {file_path}: {e}") print(f"Прочитано {len(documents)} документов") return documents def normalize_punct_format(punct: str) -> str: """ Нормализует формат номера пункта для единообразного сравнения. Args: punct: Номер пункта Returns: Нормализованный номер пункта """ # Убираем пробелы punct = punct.strip() # Убираем завершающую точку, если она есть if punct.endswith('.'): punct = punct[:-1] return punct def normalize_appendix_format(appendix: str) -> str: """ Нормализует формат номера приложения для единообразного сравнения. Args: appendix: Номер приложения Returns: Нормализованный номер приложения """ # Убираем пробелы appendix = appendix.strip() # Обработка форматов с дефисом (например, "14-1") if "-" in appendix: return appendix return appendix def find_matching_key(search_key, available_keys, item_type='punct', debug_mode=False): """ Ищет наиболее подходящий ключ среди доступных ключей с учетом типа элемента Args: search_key: Ключ для поиска available_keys: Доступные ключи item_type: Тип элемента ('punct' или 'appendix') debug_mode: Режим отладки Returns: Найденный ключ или None """ if not available_keys: return None # Нормализуем ключ в зависимости от типа элемента if item_type == 'punct': normalized_search_key = normalize_punct_format(search_key) else: # appendix normalized_search_key = normalize_appendix_format(search_key) # Проверяем прямое совпадение ключей for key in available_keys: if item_type == 'punct': normalized_key = normalize_punct_format(key) else: # appendix normalized_key = normalize_appendix_format(key) if normalized_key == normalized_search_key: if debug_mode: print(f"Найдено прямое совпадение для {item_type} {search_key} -> {key}") return key # Если прямого совпадения нет, проверяем "мягкое" совпадение # Только для пунктов, не для приложений if item_type == 'punct': for key in available_keys: normalized_key = normalize_punct_format(key) # Если ключ содержит "/", это подпункт приложения, его не следует сопоставлять с обычным пунктом if '/' in key and '/' not in search_key: continue # Проверяем совпадение конца номера (например, "1.2" и "1.2.") if normalized_key.rstrip('.') == normalized_search_key.rstrip('.'): if debug_mode: print(f"Найдено мягкое совпадение для {search_key} -> {key}") return key return None def extract_item_texts(documents, debug_mode=False): """ Извлекает тексты пунктов и приложений из документов. Args: documents: Словарь с распарсенными документами {doc_name: document} debug_mode: Включать ли режим отладки Returns: Словарь с текстами пунктов и приложений, организованный по названиям документов """ print("Извлечение текстов пунктов и приложений...") item_texts = {} all_extracted_items = set() all_extracted_appendices = set() for doc_name, document in tqdm(documents.items(), desc="Применение стратегии numbered_items"): # Используем стратегию numbered_items с режимом отладки destructurer = Destructurer(document) destructurer.configure('numbered_items', debug_mode=debug_mode) entities, _ = destructurer.destructure() # Инициализируем структуру для документа, если она еще не создана if doc_name not in item_texts: item_texts[doc_name] = { 'puncts': {}, # Для пунктов основного текста 'appendices': {} # Для приложений } for entity in entities: # Пропускаем сущность документа if entity.type == "Document": continue # Работаем только с чанками для поиска if hasattr(entity, 'use_in_search') and entity.use_in_search: metadata = entity.metadata text = entity.text # Для пунктов if 'item_number' in metadata: item_number = metadata['item_number'] # Проверяем, является ли пункт подпунктом приложения if 'appendix_number' in metadata: # Это подпункт приложения appendix_number = metadata['appendix_number'] # Создаем структуру для приложения, если ее еще нет if appendix_number not in item_texts[doc_name]['appendices']: item_texts[doc_name]['appendices'][appendix_number] = { 'main_text': '', # Основной текст приложения 'subpuncts': {} # Подпункты приложения } # Добавляем подпункт в словарь подпунктов item_texts[doc_name]['appendices'][appendix_number]['subpuncts'][item_number] = text if debug_mode: print(f"Извлечен подпункт {item_number} приложения {appendix_number} из {doc_name}") all_extracted_items.add(item_number) else: # Обычный пункт item_texts[doc_name]['puncts'][item_number] = text if debug_mode: print(f"Извлечен пункт {item_number} из {doc_name}") all_extracted_items.add(item_number) # Для приложений elif 'appendix_number' in metadata and 'item_number' not in metadata: appendix_number = metadata['appendix_number'] # Создаем структуру для приложения, если ее еще нет if appendix_number not in item_texts[doc_name]['appendices']: item_texts[doc_name]['appendices'][appendix_number] = { 'main_text': text, # Основной текст приложения 'subpuncts': {} # Подпункты приложения } else: # Если приложение уже существует, обновляем основной текст item_texts[doc_name]['appendices'][appendix_number]['main_text'] = text if debug_mode: print(f"Извлечено приложение {appendix_number} из {doc_name}") all_extracted_appendices.add(appendix_number) # Выводим статистику, если включен режим отладки if debug_mode: print(f"\nВсего извлечено уникальных пунктов: {len(all_extracted_items)}") print(f"Примеры форматов пунктов: {', '.join(sorted(list(all_extracted_items))[:20])}") print(f"\nВсего извлечено уникальных приложений: {len(all_extracted_appendices)}") print(f"Форматы приложений: {', '.join(sorted(list(all_extracted_appendices)))}") # Подсчитываем общее количество пунктов и приложений total_puncts = sum(len(doc_data['puncts']) for doc_data in item_texts.values()) total_appendices = sum(len(doc_data['appendices']) for doc_data in item_texts.values()) print(f"Извлечено {total_puncts} пунктов и {total_appendices} приложений из {len(item_texts)} документов") return item_texts def is_subpunct(parent_punct: str, possible_subpunct: str) -> bool: """ Проверяет, является ли пункт подпунктом другого пункта. Args: parent_punct: Родительский пункт (например, "14") possible_subpunct: Возможный подпункт (например, "14.1") Returns: True, если possible_subpunct является подпунктом parent_punct """ # Нормализуем пункты parent = normalize_punct_format(parent_punct) child = normalize_punct_format(possible_subpunct) # Проверяем, начинается ли child с parent и после него идет точка или другой разделитель if child.startswith(parent): # Если длины равны, это тот же самый пункт if len(child) == len(parent): return False # Проверяем символ после parent - должна быть точка (дефис исключен, т.к. это разные пункты) next_char = child[len(parent)] return next_char in ['.'] return False def collect_subpuncts(punct: str, all_puncts: dict) -> dict: """ Собирает все подпункты для указанного пункта. Args: punct: Пункт, для которого нужно найти подпункты (например, "14") all_puncts: Словарь всех пунктов {punct: text} Returns: Словарь {punct: text} с пунктом и всеми его подпунктами """ result = {} normalized_punct = normalize_punct_format(punct) # Добавляем сам пункт, если он существует if normalized_punct in all_puncts: result[normalized_punct] = all_puncts[normalized_punct] # Ищем подпункты for possible_subpunct in all_puncts.keys(): if is_subpunct(normalized_punct, possible_subpunct): result[possible_subpunct] = all_puncts[possible_subpunct] return result def prepare_expanded_dataset(df, item_texts, output_path, debug_mode=False): """ Подготавливает расширенный датасет, добавляя тексты пунктов и приложений. Args: df: Исходный датасет item_texts: Словарь с текстами пунктов и приложений output_path: Путь для сохранения расширенного датасета debug_mode: Включать ли режим отладки Returns: Датафрейм с расширенным датасетом """ rows = [] skipped_items = 0 total_items = 0 for _, row in df.iterrows(): question_id = row['id'] question = row['question'] filepath = row.get('filepath', '') # Получаем имя файла без пути doc_name = Path(filepath).stem if filepath else '' # Пропускаем, если файл не найден if not doc_name or doc_name not in item_texts: if debug_mode and doc_name: print(f"Документ {doc_name} не найден в извлеченных данных") continue # Обрабатываем пункты puncts = row.get('puncts', []) if isinstance(puncts, str) and puncts.strip(): # Преобразуем строковое представление в список try: puncts = eval(puncts) except: puncts = [] if not isinstance(puncts, list): puncts = [] for punct in puncts: total_items += 1 if debug_mode: print(f"\nОбработка пункта {punct} для вопроса {question_id} из {doc_name}") # Ищем соответствующий пункт в документе available_keys = list(item_texts[doc_name]['puncts'].keys()) matching_key = find_matching_key(punct, available_keys, 'punct', debug_mode) if matching_key: # Сохраняем основной текст пункта item_text = item_texts[doc_name]['puncts'][matching_key] # Список всех включенных ключей (для отслеживания что было приконкатенировано) matched_keys = [matching_key] # Ищем все подпункты для этого пункта subpuncts = {} for key in available_keys: if is_subpunct(matching_key, key): subpuncts[key] = item_texts[doc_name]['puncts'][key] matched_keys.append(key) # Если есть подпункты, добавляем их к основному тексту if subpuncts: # Сортируем подпункты по номеру sorted_subpuncts = sorted(subpuncts.items(), key=lambda x: x[0]) # Добавляем разделитель и все подпункты combined_text = item_text for key, subtext in sorted_subpuncts: combined_text += f"\n\n{key} {subtext}" item_text = combined_text # Добавляем строку с пунктом и его подпунктами rows.append({ 'id': question_id, 'question': question, 'filename': doc_name, 'text': item_text, 'item_type': 'punct', 'item_id': punct, 'matching_keys': ", ".join(matched_keys) }) if debug_mode: print(f"Добавлен пункт {matching_key} для {question_id} с {len(matched_keys)} ключами") if len(matched_keys) > 1: print(f" Включены ключи: {', '.join(matched_keys)}") else: skipped_items += 1 if debug_mode: print(f"Не найден соответствующий пункт для {punct} в {doc_name}") # Обрабатываем приложения appendices = row.get('appendices', []) if isinstance(appendices, str) and appendices.strip(): # Преобразуем строковое представление в список try: appendices = eval(appendices) except: appendices = [] if not isinstance(appendices, list): appendices = [] for appendix in appendices: total_items += 1 if debug_mode: print(f"\nОбработка приложения {appendix} для вопроса {question_id} из {doc_name}") # Ищем соответствующее приложение в документе available_keys = list(item_texts[doc_name]['appendices'].keys()) matching_key = find_matching_key(appendix, available_keys, 'appendix', debug_mode) if matching_key: appendix_content = item_texts[doc_name]['appendices'][matching_key] # Список всех включенных ключей (для отслеживания что было приконкатенировано) matched_keys = [matching_key] # Формируем полный текст приложения, включая все подпункты if isinstance(appendix_content, dict): # Начинаем с основного текста full_text = appendix_content.get('main_text', '') # Добавляем все подпункты в отсортированном порядке if 'subpuncts' in appendix_content and appendix_content['subpuncts']: subpuncts = appendix_content['subpuncts'] sorted_subpuncts = sorted(subpuncts.items(), key=lambda x: x[0]) # Добавляем разделитель, если есть основной текст if full_text: full_text += "\n\n" # Добавляем все подпункты for i, (key, subtext) in enumerate(sorted_subpuncts): matched_keys.append(f"{matching_key}/{key}") if i > 0: full_text += "\n\n" full_text += f"{key} {subtext}" else: # Если приложение просто строка full_text = appendix_content # Добавляем строку с приложением rows.append({ 'id': question_id, 'question': question, 'filename': doc_name, 'text': full_text, 'item_type': 'appendix', 'item_id': appendix, 'matching_keys': ", ".join(matched_keys) }) if debug_mode: print(f"Добавлено приложение {matching_key} для {question_id} с {len(matched_keys)} ключами") if len(matched_keys) > 1: print(f" Включены ключи: {', '.join(matched_keys)}") else: skipped_items += 1 if debug_mode: print(f"Не найдено соответствующее приложение для {appendix} в {doc_name}") extended_df = pd.DataFrame(rows) # Сохраняем расширенный датасет extended_df.to_excel(output_path, index=False) print(f"Расширенный датасет сохранен в {output_path}") print(f"Всего обработано элементов: {total_items}") print(f"Всего элементов в расширенном датасете: {len(extended_df)}") print(f"Пропущено элементов из-за отсутствия соответствия: {skipped_items}") return extended_df def main(): # Парсим аргументы командной строки args = parse_args() # Определяем режим отладки debug = args.debug # Загружаем исходный датасет df = load_dataset(args.input_dataset, debug) # Читаем документы documents = read_documents(args.data_folder) # Извлекаем тексты пунктов и приложений item_texts = extract_item_texts(documents, debug) # Подготавливаем расширенный датасет expanded_df = prepare_expanded_dataset(df, item_texts, args.output_dataset, debug) print("Готово!") if __name__ == "__main__": main()