Spaces:
Sleeping
Sleeping
#!/usr/bin/env python | |
""" | |
Скрипт для подготовки датасета с вопросами и текстами пунктов/приложений. | |
Преобразует исходный датасет, содержащий списки пунктов, в расширенный датасет, | |
где каждому пункту/приложению соответствует отдельная строка. | |
""" | |
import argparse | |
import sys | |
from pathlib import Path | |
from typing import Any, Dict | |
import pandas as pd | |
from tqdm import tqdm | |
from ntr_text_fragmentation import Destructurer | |
sys.path.insert(0, str(Path(__file__).parent.parent)) | |
from ntr_fileparser import UniversalParser | |
def parse_args(): | |
""" | |
Парсит аргументы командной строки. | |
Returns: | |
Аргументы командной строки | |
""" | |
parser = argparse.ArgumentParser(description="Подготовка датасета с текстами пунктов") | |
parser.add_argument('--input-dataset', type=str, default='data/dataset.xlsx', | |
help='Путь к исходному датасету (Excel-файл)') | |
parser.add_argument('--output-dataset', type=str, default='data/dataset_with_texts.xlsx', | |
help='Путь для сохранения подготовленного датасета (Excel-файл)') | |
parser.add_argument('--data-folder', type=str, default='data/docs', | |
help='Путь к папке с документами') | |
parser.add_argument('--debug', action='store_true', | |
help='Включить режим отладки с дополнительным выводом информации') | |
return parser.parse_args() | |
def load_dataset(file_path: str, debug: bool = False) -> pd.DataFrame: | |
""" | |
Загружает исходный датасет с вопросами. | |
Args: | |
file_path: Путь к Excel-файлу | |
debug: Режим отладки | |
Returns: | |
DataFrame с вопросами | |
""" | |
print(f"Загрузка исходного датасета из {file_path}...") | |
df = pd.read_excel(file_path) | |
# Преобразуем строковые списки в настоящие списки | |
for col in ['puncts', 'appendices']: | |
if col in df.columns: | |
df[col] = df[col].apply(lambda x: | |
eval(x) if isinstance(x, str) and x.strip() | |
else ([] if pd.isna(x) else x)) | |
# Вывод отладочной информации о форматах пунктов/приложений | |
if debug: | |
all_puncts = set() | |
all_appendices = set() | |
for _, row in df.iterrows(): | |
if 'puncts' in row and row['puncts']: | |
all_puncts.update(row['puncts']) | |
if 'appendices' in row and row['appendices']: | |
all_appendices.update(row['appendices']) | |
print(f"\nУникальные форматы пунктов в датасете ({len(all_puncts)}):") | |
for i, p in enumerate(sorted(all_puncts)): | |
if i < 20 or i > len(all_puncts) - 20: | |
print(f" - {repr(p)}") | |
elif i == 20: | |
print(" ... (пропущено)") | |
print(f"\nУникальные форматы приложений в датасете ({len(all_appendices)}):") | |
for app in sorted(all_appendices): | |
print(f" - {repr(app)}") | |
print(f"Загружено {len(df)} вопросов") | |
return df | |
def read_documents(folder_path: str) -> Dict[str, Any]: | |
""" | |
Читает все документы из указанной папки. | |
Args: | |
folder_path: Путь к папке с документами | |
Returns: | |
Словарь {имя_файла: parsed_document} | |
""" | |
print(f"Чтение документов из {folder_path}...") | |
parser = UniversalParser() | |
documents = {} | |
for file_path in tqdm(list(Path(folder_path).glob("*.docx")), desc="Чтение документов"): | |
try: | |
doc_name = file_path.stem | |
documents[doc_name] = parser.parse_by_path(str(file_path)) | |
except Exception as e: | |
print(f"Ошибка при чтении файла {file_path}: {e}") | |
print(f"Прочитано {len(documents)} документов") | |
return documents | |
def normalize_punct_format(punct: str) -> str: | |
""" | |
Нормализует формат номера пункта для единообразного сравнения. | |
Args: | |
punct: Номер пункта | |
Returns: | |
Нормализованный номер пункта | |
""" | |
# Убираем пробелы | |
punct = punct.strip() | |
# Убираем завершающую точку, если она есть | |
if punct.endswith('.'): | |
punct = punct[:-1] | |
return punct | |
def normalize_appendix_format(appendix: str) -> str: | |
""" | |
Нормализует формат номера приложения для единообразного сравнения. | |
Args: | |
appendix: Номер приложения | |
Returns: | |
Нормализованный номер приложения | |
""" | |
# Убираем пробелы | |
appendix = appendix.strip() | |
# Обработка форматов с дефисом (например, "14-1") | |
if "-" in appendix: | |
return appendix | |
return appendix | |
def find_matching_key(search_key, available_keys, item_type='punct', debug_mode=False): | |
""" | |
Ищет наиболее подходящий ключ среди доступных ключей с учетом типа элемента | |
Args: | |
search_key: Ключ для поиска | |
available_keys: Доступные ключи | |
item_type: Тип элемента ('punct' или 'appendix') | |
debug_mode: Режим отладки | |
Returns: | |
Найденный ключ или None | |
""" | |
if not available_keys: | |
return None | |
# Нормализуем ключ в зависимости от типа элемента | |
if item_type == 'punct': | |
normalized_search_key = normalize_punct_format(search_key) | |
else: # appendix | |
normalized_search_key = normalize_appendix_format(search_key) | |
# Проверяем прямое совпадение ключей | |
for key in available_keys: | |
if item_type == 'punct': | |
normalized_key = normalize_punct_format(key) | |
else: # appendix | |
normalized_key = normalize_appendix_format(key) | |
if normalized_key == normalized_search_key: | |
if debug_mode: | |
print(f"Найдено прямое совпадение для {item_type} {search_key} -> {key}") | |
return key | |
# Если прямого совпадения нет, проверяем "мягкое" совпадение | |
# Только для пунктов, не для приложений | |
if item_type == 'punct': | |
for key in available_keys: | |
normalized_key = normalize_punct_format(key) | |
# Если ключ содержит "/", это подпункт приложения, его не следует сопоставлять с обычным пунктом | |
if '/' in key and '/' not in search_key: | |
continue | |
# Проверяем совпадение конца номера (например, "1.2" и "1.2.") | |
if normalized_key.rstrip('.') == normalized_search_key.rstrip('.'): | |
if debug_mode: | |
print(f"Найдено мягкое совпадение для {search_key} -> {key}") | |
return key | |
return None | |
def extract_item_texts(documents, debug_mode=False): | |
""" | |
Извлекает тексты пунктов и приложений из документов. | |
Args: | |
documents: Словарь с распарсенными документами {doc_name: document} | |
debug_mode: Включать ли режим отладки | |
Returns: | |
Словарь с текстами пунктов и приложений, организованный по названиям документов | |
""" | |
print("Извлечение текстов пунктов и приложений...") | |
item_texts = {} | |
all_extracted_items = set() | |
all_extracted_appendices = set() | |
for doc_name, document in tqdm(documents.items(), desc="Применение стратегии numbered_items"): | |
# Используем стратегию numbered_items с режимом отладки | |
destructurer = Destructurer(document) | |
destructurer.configure('numbered_items', debug_mode=debug_mode) | |
entities, _ = destructurer.destructure() | |
# Инициализируем структуру для документа, если она еще не создана | |
if doc_name not in item_texts: | |
item_texts[doc_name] = { | |
'puncts': {}, # Для пунктов основного текста | |
'appendices': {} # Для приложений | |
} | |
for entity in entities: | |
# Пропускаем сущность документа | |
if entity.type == "Document": | |
continue | |
# Работаем только с чанками для поиска | |
if hasattr(entity, 'use_in_search') and entity.use_in_search: | |
metadata = entity.metadata | |
text = entity.text | |
# Для пунктов | |
if 'item_number' in metadata: | |
item_number = metadata['item_number'] | |
# Проверяем, является ли пункт подпунктом приложения | |
if 'appendix_number' in metadata: | |
# Это подпункт приложения | |
appendix_number = metadata['appendix_number'] | |
# Создаем структуру для приложения, если ее еще нет | |
if appendix_number not in item_texts[doc_name]['appendices']: | |
item_texts[doc_name]['appendices'][appendix_number] = { | |
'main_text': '', # Основной текст приложения | |
'subpuncts': {} # Подпункты приложения | |
} | |
# Добавляем подпункт в словарь подпунктов | |
item_texts[doc_name]['appendices'][appendix_number]['subpuncts'][item_number] = text | |
if debug_mode: | |
print(f"Извлечен подпункт {item_number} приложения {appendix_number} из {doc_name}") | |
all_extracted_items.add(item_number) | |
else: | |
# Обычный пункт | |
item_texts[doc_name]['puncts'][item_number] = text | |
if debug_mode: | |
print(f"Извлечен пункт {item_number} из {doc_name}") | |
all_extracted_items.add(item_number) | |
# Для приложений | |
elif 'appendix_number' in metadata and 'item_number' not in metadata: | |
appendix_number = metadata['appendix_number'] | |
# Создаем структуру для приложения, если ее еще нет | |
if appendix_number not in item_texts[doc_name]['appendices']: | |
item_texts[doc_name]['appendices'][appendix_number] = { | |
'main_text': text, # Основной текст приложения | |
'subpuncts': {} # Подпункты приложения | |
} | |
else: | |
# Если приложение уже существует, обновляем основной текст | |
item_texts[doc_name]['appendices'][appendix_number]['main_text'] = text | |
if debug_mode: | |
print(f"Извлечено приложение {appendix_number} из {doc_name}") | |
all_extracted_appendices.add(appendix_number) | |
# Выводим статистику, если включен режим отладки | |
if debug_mode: | |
print(f"\nВсего извлечено уникальных пунктов: {len(all_extracted_items)}") | |
print(f"Примеры форматов пунктов: {', '.join(sorted(list(all_extracted_items))[:20])}") | |
print(f"\nВсего извлечено уникальных приложений: {len(all_extracted_appendices)}") | |
print(f"Форматы приложений: {', '.join(sorted(list(all_extracted_appendices)))}") | |
# Подсчитываем общее количество пунктов и приложений | |
total_puncts = sum(len(doc_data['puncts']) for doc_data in item_texts.values()) | |
total_appendices = sum(len(doc_data['appendices']) for doc_data in item_texts.values()) | |
print(f"Извлечено {total_puncts} пунктов и {total_appendices} приложений из {len(item_texts)} документов") | |
return item_texts | |
def is_subpunct(parent_punct: str, possible_subpunct: str) -> bool: | |
""" | |
Проверяет, является ли пункт подпунктом другого пункта. | |
Args: | |
parent_punct: Родительский пункт (например, "14") | |
possible_subpunct: Возможный подпункт (например, "14.1") | |
Returns: | |
True, если possible_subpunct является подпунктом parent_punct | |
""" | |
# Нормализуем пункты | |
parent = normalize_punct_format(parent_punct) | |
child = normalize_punct_format(possible_subpunct) | |
# Проверяем, начинается ли child с parent и после него идет точка или другой разделитель | |
if child.startswith(parent): | |
# Если длины равны, это тот же самый пункт | |
if len(child) == len(parent): | |
return False | |
# Проверяем символ после parent - должна быть точка (дефис исключен, т.к. это разные пункты) | |
next_char = child[len(parent)] | |
return next_char in ['.'] | |
return False | |
def collect_subpuncts(punct: str, all_puncts: dict) -> dict: | |
""" | |
Собирает все подпункты для указанного пункта. | |
Args: | |
punct: Пункт, для которого нужно найти подпункты (например, "14") | |
all_puncts: Словарь всех пунктов {punct: text} | |
Returns: | |
Словарь {punct: text} с пунктом и всеми его подпунктами | |
""" | |
result = {} | |
normalized_punct = normalize_punct_format(punct) | |
# Добавляем сам пункт, если он существует | |
if normalized_punct in all_puncts: | |
result[normalized_punct] = all_puncts[normalized_punct] | |
# Ищем подпункты | |
for possible_subpunct in all_puncts.keys(): | |
if is_subpunct(normalized_punct, possible_subpunct): | |
result[possible_subpunct] = all_puncts[possible_subpunct] | |
return result | |
def prepare_expanded_dataset(df, item_texts, output_path, debug_mode=False): | |
""" | |
Подготавливает расширенный датасет, добавляя тексты пунктов и приложений. | |
Args: | |
df: Исходный датасет | |
item_texts: Словарь с текстами пунктов и приложений | |
output_path: Путь для сохранения расширенного датасета | |
debug_mode: Включать ли режим отладки | |
Returns: | |
Датафрейм с расширенным датасетом | |
""" | |
rows = [] | |
skipped_items = 0 | |
total_items = 0 | |
for _, row in df.iterrows(): | |
question_id = row['id'] | |
question = row['question'] | |
filepath = row.get('filepath', '') | |
# Получаем имя файла без пути | |
doc_name = Path(filepath).stem if filepath else '' | |
# Пропускаем, если файл не найден | |
if not doc_name or doc_name not in item_texts: | |
if debug_mode and doc_name: | |
print(f"Документ {doc_name} не найден в извлеченных данных") | |
continue | |
# Обрабатываем пункты | |
puncts = row.get('puncts', []) | |
if isinstance(puncts, str) and puncts.strip(): | |
# Преобразуем строковое представление в список | |
try: | |
puncts = eval(puncts) | |
except: | |
puncts = [] | |
if not isinstance(puncts, list): | |
puncts = [] | |
for punct in puncts: | |
total_items += 1 | |
if debug_mode: | |
print(f"\nОбработка пункта {punct} для вопроса {question_id} из {doc_name}") | |
# Ищем соответствующий пункт в документе | |
available_keys = list(item_texts[doc_name]['puncts'].keys()) | |
matching_key = find_matching_key(punct, available_keys, 'punct', debug_mode) | |
if matching_key: | |
# Сохраняем основной текст пункта | |
item_text = item_texts[doc_name]['puncts'][matching_key] | |
# Список всех включенных ключей (для отслеживания что было приконкатенировано) | |
matched_keys = [matching_key] | |
# Ищем все подпункты для этого пункта | |
subpuncts = {} | |
for key in available_keys: | |
if is_subpunct(matching_key, key): | |
subpuncts[key] = item_texts[doc_name]['puncts'][key] | |
matched_keys.append(key) | |
# Если есть подпункты, добавляем их к основному тексту | |
if subpuncts: | |
# Сортируем подпункты по номеру | |
sorted_subpuncts = sorted(subpuncts.items(), key=lambda x: x[0]) | |
# Добавляем разделитель и все подпункты | |
combined_text = item_text | |
for key, subtext in sorted_subpuncts: | |
combined_text += f"\n\n{key} {subtext}" | |
item_text = combined_text | |
# Добавляем строку с пунктом и его подпунктами | |
rows.append({ | |
'id': question_id, | |
'question': question, | |
'filename': doc_name, | |
'text': item_text, | |
'item_type': 'punct', | |
'item_id': punct, | |
'matching_keys': ", ".join(matched_keys) | |
}) | |
if debug_mode: | |
print(f"Добавлен пункт {matching_key} для {question_id} с {len(matched_keys)} ключами") | |
if len(matched_keys) > 1: | |
print(f" Включены ключи: {', '.join(matched_keys)}") | |
else: | |
skipped_items += 1 | |
if debug_mode: | |
print(f"Не найден соответствующий пункт для {punct} в {doc_name}") | |
# Обрабатываем приложения | |
appendices = row.get('appendices', []) | |
if isinstance(appendices, str) and appendices.strip(): | |
# Преобразуем строковое представление в список | |
try: | |
appendices = eval(appendices) | |
except: | |
appendices = [] | |
if not isinstance(appendices, list): | |
appendices = [] | |
for appendix in appendices: | |
total_items += 1 | |
if debug_mode: | |
print(f"\nОбработка приложения {appendix} для вопроса {question_id} из {doc_name}") | |
# Ищем соответствующее приложение в документе | |
available_keys = list(item_texts[doc_name]['appendices'].keys()) | |
matching_key = find_matching_key(appendix, available_keys, 'appendix', debug_mode) | |
if matching_key: | |
appendix_content = item_texts[doc_name]['appendices'][matching_key] | |
# Список всех включенных ключей (для отслеживания что было приконкатенировано) | |
matched_keys = [matching_key] | |
# Формируем полный текст приложения, включая все подпункты | |
if isinstance(appendix_content, dict): | |
# Начинаем с основного текста | |
full_text = appendix_content.get('main_text', '') | |
# Добавляем все подпункты в отсортированном порядке | |
if 'subpuncts' in appendix_content and appendix_content['subpuncts']: | |
subpuncts = appendix_content['subpuncts'] | |
sorted_subpuncts = sorted(subpuncts.items(), key=lambda x: x[0]) | |
# Добавляем разделитель, если есть основной текст | |
if full_text: | |
full_text += "\n\n" | |
# Добавляем все подпункты | |
for i, (key, subtext) in enumerate(sorted_subpuncts): | |
matched_keys.append(f"{matching_key}/{key}") | |
if i > 0: | |
full_text += "\n\n" | |
full_text += f"{key} {subtext}" | |
else: | |
# Если приложение просто строка | |
full_text = appendix_content | |
# Добавляем строку с приложением | |
rows.append({ | |
'id': question_id, | |
'question': question, | |
'filename': doc_name, | |
'text': full_text, | |
'item_type': 'appendix', | |
'item_id': appendix, | |
'matching_keys': ", ".join(matched_keys) | |
}) | |
if debug_mode: | |
print(f"Добавлено приложение {matching_key} для {question_id} с {len(matched_keys)} ключами") | |
if len(matched_keys) > 1: | |
print(f" Включены ключи: {', '.join(matched_keys)}") | |
else: | |
skipped_items += 1 | |
if debug_mode: | |
print(f"Не найдено соответствующее приложение для {appendix} в {doc_name}") | |
extended_df = pd.DataFrame(rows) | |
# Сохраняем расширенный датасет | |
extended_df.to_excel(output_path, index=False) | |
print(f"Расширенный датасет сохранен в {output_path}") | |
print(f"Всего обработано элементов: {total_items}") | |
print(f"Всего элементов в расширенном датасете: {len(extended_df)}") | |
print(f"Пропущено элементов из-за отсутствия соответствия: {skipped_items}") | |
return extended_df | |
def main(): | |
# Парсим аргументы командной строки | |
args = parse_args() | |
# Определяем режим отладки | |
debug = args.debug | |
# Загружаем исходный датасет | |
df = load_dataset(args.input_dataset, debug) | |
# Читаем документы | |
documents = read_documents(args.data_folder) | |
# Извлекаем тексты пунктов и приложений | |
item_texts = extract_item_texts(documents, debug) | |
# Подготавливаем расширенный датасет | |
expanded_df = prepare_expanded_dataset(df, item_texts, args.output_dataset, debug) | |
print("Готово!") | |
if __name__ == "__main__": | |
main() |