muryshev's picture
update
86c402d
raw
history blame
26.5 kB
#!/usr/bin/env python
"""
Скрипт для подготовки датасета с вопросами и текстами пунктов/приложений.
Преобразует исходный датасет, содержащий списки пунктов, в расширенный датасет,
где каждому пункту/приложению соответствует отдельная строка.
"""
import argparse
import sys
from pathlib import Path
from typing import Any, Dict
import pandas as pd
from tqdm import tqdm
from ntr_text_fragmentation import Destructurer
sys.path.insert(0, str(Path(__file__).parent.parent))
from ntr_fileparser import UniversalParser
def parse_args():
"""
Парсит аргументы командной строки.
Returns:
Аргументы командной строки
"""
parser = argparse.ArgumentParser(description="Подготовка датасета с текстами пунктов")
parser.add_argument('--input-dataset', type=str, default='data/dataset.xlsx',
help='Путь к исходному датасету (Excel-файл)')
parser.add_argument('--output-dataset', type=str, default='data/dataset_with_texts.xlsx',
help='Путь для сохранения подготовленного датасета (Excel-файл)')
parser.add_argument('--data-folder', type=str, default='data/docs',
help='Путь к папке с документами')
parser.add_argument('--debug', action='store_true',
help='Включить режим отладки с дополнительным выводом информации')
return parser.parse_args()
def load_dataset(file_path: str, debug: bool = False) -> pd.DataFrame:
"""
Загружает исходный датасет с вопросами.
Args:
file_path: Путь к Excel-файлу
debug: Режим отладки
Returns:
DataFrame с вопросами
"""
print(f"Загрузка исходного датасета из {file_path}...")
df = pd.read_excel(file_path)
# Преобразуем строковые списки в настоящие списки
for col in ['puncts', 'appendices']:
if col in df.columns:
df[col] = df[col].apply(lambda x:
eval(x) if isinstance(x, str) and x.strip()
else ([] if pd.isna(x) else x))
# Вывод отладочной информации о форматах пунктов/приложений
if debug:
all_puncts = set()
all_appendices = set()
for _, row in df.iterrows():
if 'puncts' in row and row['puncts']:
all_puncts.update(row['puncts'])
if 'appendices' in row and row['appendices']:
all_appendices.update(row['appendices'])
print(f"\nУникальные форматы пунктов в датасете ({len(all_puncts)}):")
for i, p in enumerate(sorted(all_puncts)):
if i < 20 or i > len(all_puncts) - 20:
print(f" - {repr(p)}")
elif i == 20:
print(" ... (пропущено)")
print(f"\nУникальные форматы приложений в датасете ({len(all_appendices)}):")
for app in sorted(all_appendices):
print(f" - {repr(app)}")
print(f"Загружено {len(df)} вопросов")
return df
def read_documents(folder_path: str) -> Dict[str, Any]:
"""
Читает все документы из указанной папки.
Args:
folder_path: Путь к папке с документами
Returns:
Словарь {имя_файла: parsed_document}
"""
print(f"Чтение документов из {folder_path}...")
parser = UniversalParser()
documents = {}
for file_path in tqdm(list(Path(folder_path).glob("*.docx")), desc="Чтение документов"):
try:
doc_name = file_path.stem
documents[doc_name] = parser.parse_by_path(str(file_path))
except Exception as e:
print(f"Ошибка при чтении файла {file_path}: {e}")
print(f"Прочитано {len(documents)} документов")
return documents
def normalize_punct_format(punct: str) -> str:
"""
Нормализует формат номера пункта для единообразного сравнения.
Args:
punct: Номер пункта
Returns:
Нормализованный номер пункта
"""
# Убираем пробелы
punct = punct.strip()
# Убираем завершающую точку, если она есть
if punct.endswith('.'):
punct = punct[:-1]
return punct
def normalize_appendix_format(appendix: str) -> str:
"""
Нормализует формат номера приложения для единообразного сравнения.
Args:
appendix: Номер приложения
Returns:
Нормализованный номер приложения
"""
# Убираем пробелы
appendix = appendix.strip()
# Обработка форматов с дефисом (например, "14-1")
if "-" in appendix:
return appendix
return appendix
def find_matching_key(search_key, available_keys, item_type='punct', debug_mode=False):
"""
Ищет наиболее подходящий ключ среди доступных ключей с учетом типа элемента
Args:
search_key: Ключ для поиска
available_keys: Доступные ключи
item_type: Тип элемента ('punct' или 'appendix')
debug_mode: Режим отладки
Returns:
Найденный ключ или None
"""
if not available_keys:
return None
# Нормализуем ключ в зависимости от типа элемента
if item_type == 'punct':
normalized_search_key = normalize_punct_format(search_key)
else: # appendix
normalized_search_key = normalize_appendix_format(search_key)
# Проверяем прямое совпадение ключей
for key in available_keys:
if item_type == 'punct':
normalized_key = normalize_punct_format(key)
else: # appendix
normalized_key = normalize_appendix_format(key)
if normalized_key == normalized_search_key:
if debug_mode:
print(f"Найдено прямое совпадение для {item_type} {search_key} -> {key}")
return key
# Если прямого совпадения нет, проверяем "мягкое" совпадение
# Только для пунктов, не для приложений
if item_type == 'punct':
for key in available_keys:
normalized_key = normalize_punct_format(key)
# Если ключ содержит "/", это подпункт приложения, его не следует сопоставлять с обычным пунктом
if '/' in key and '/' not in search_key:
continue
# Проверяем совпадение конца номера (например, "1.2" и "1.2.")
if normalized_key.rstrip('.') == normalized_search_key.rstrip('.'):
if debug_mode:
print(f"Найдено мягкое совпадение для {search_key} -> {key}")
return key
return None
def extract_item_texts(documents, debug_mode=False):
"""
Извлекает тексты пунктов и приложений из документов.
Args:
documents: Словарь с распарсенными документами {doc_name: document}
debug_mode: Включать ли режим отладки
Returns:
Словарь с текстами пунктов и приложений, организованный по названиям документов
"""
print("Извлечение текстов пунктов и приложений...")
item_texts = {}
all_extracted_items = set()
all_extracted_appendices = set()
for doc_name, document in tqdm(documents.items(), desc="Применение стратегии numbered_items"):
# Используем стратегию numbered_items с режимом отладки
destructurer = Destructurer(document)
destructurer.configure('numbered_items', debug_mode=debug_mode)
entities, _ = destructurer.destructure()
# Инициализируем структуру для документа, если она еще не создана
if doc_name not in item_texts:
item_texts[doc_name] = {
'puncts': {}, # Для пунктов основного текста
'appendices': {} # Для приложений
}
for entity in entities:
# Пропускаем сущность документа
if entity.type == "Document":
continue
# Работаем только с чанками для поиска
if hasattr(entity, 'use_in_search') and entity.use_in_search:
metadata = entity.metadata
text = entity.text
# Для пунктов
if 'item_number' in metadata:
item_number = metadata['item_number']
# Проверяем, является ли пункт подпунктом приложения
if 'appendix_number' in metadata:
# Это подпункт приложения
appendix_number = metadata['appendix_number']
# Создаем структуру для приложения, если ее еще нет
if appendix_number not in item_texts[doc_name]['appendices']:
item_texts[doc_name]['appendices'][appendix_number] = {
'main_text': '', # Основной текст приложения
'subpuncts': {} # Подпункты приложения
}
# Добавляем подпункт в словарь подпунктов
item_texts[doc_name]['appendices'][appendix_number]['subpuncts'][item_number] = text
if debug_mode:
print(f"Извлечен подпункт {item_number} приложения {appendix_number} из {doc_name}")
all_extracted_items.add(item_number)
else:
# Обычный пункт
item_texts[doc_name]['puncts'][item_number] = text
if debug_mode:
print(f"Извлечен пункт {item_number} из {doc_name}")
all_extracted_items.add(item_number)
# Для приложений
elif 'appendix_number' in metadata and 'item_number' not in metadata:
appendix_number = metadata['appendix_number']
# Создаем структуру для приложения, если ее еще нет
if appendix_number not in item_texts[doc_name]['appendices']:
item_texts[doc_name]['appendices'][appendix_number] = {
'main_text': text, # Основной текст приложения
'subpuncts': {} # Подпункты приложения
}
else:
# Если приложение уже существует, обновляем основной текст
item_texts[doc_name]['appendices'][appendix_number]['main_text'] = text
if debug_mode:
print(f"Извлечено приложение {appendix_number} из {doc_name}")
all_extracted_appendices.add(appendix_number)
# Выводим статистику, если включен режим отладки
if debug_mode:
print(f"\nВсего извлечено уникальных пунктов: {len(all_extracted_items)}")
print(f"Примеры форматов пунктов: {', '.join(sorted(list(all_extracted_items))[:20])}")
print(f"\nВсего извлечено уникальных приложений: {len(all_extracted_appendices)}")
print(f"Форматы приложений: {', '.join(sorted(list(all_extracted_appendices)))}")
# Подсчитываем общее количество пунктов и приложений
total_puncts = sum(len(doc_data['puncts']) for doc_data in item_texts.values())
total_appendices = sum(len(doc_data['appendices']) for doc_data in item_texts.values())
print(f"Извлечено {total_puncts} пунктов и {total_appendices} приложений из {len(item_texts)} документов")
return item_texts
def is_subpunct(parent_punct: str, possible_subpunct: str) -> bool:
"""
Проверяет, является ли пункт подпунктом другого пункта.
Args:
parent_punct: Родительский пункт (например, "14")
possible_subpunct: Возможный подпункт (например, "14.1")
Returns:
True, если possible_subpunct является подпунктом parent_punct
"""
# Нормализуем пункты
parent = normalize_punct_format(parent_punct)
child = normalize_punct_format(possible_subpunct)
# Проверяем, начинается ли child с parent и после него идет точка или другой разделитель
if child.startswith(parent):
# Если длины равны, это тот же самый пункт
if len(child) == len(parent):
return False
# Проверяем символ после parent - должна быть точка (дефис исключен, т.к. это разные пункты)
next_char = child[len(parent)]
return next_char in ['.']
return False
def collect_subpuncts(punct: str, all_puncts: dict) -> dict:
"""
Собирает все подпункты для указанного пункта.
Args:
punct: Пункт, для которого нужно найти подпункты (например, "14")
all_puncts: Словарь всех пунктов {punct: text}
Returns:
Словарь {punct: text} с пунктом и всеми его подпунктами
"""
result = {}
normalized_punct = normalize_punct_format(punct)
# Добавляем сам пункт, если он существует
if normalized_punct in all_puncts:
result[normalized_punct] = all_puncts[normalized_punct]
# Ищем подпункты
for possible_subpunct in all_puncts.keys():
if is_subpunct(normalized_punct, possible_subpunct):
result[possible_subpunct] = all_puncts[possible_subpunct]
return result
def prepare_expanded_dataset(df, item_texts, output_path, debug_mode=False):
"""
Подготавливает расширенный датасет, добавляя тексты пунктов и приложений.
Args:
df: Исходный датасет
item_texts: Словарь с текстами пунктов и приложений
output_path: Путь для сохранения расширенного датасета
debug_mode: Включать ли режим отладки
Returns:
Датафрейм с расширенным датасетом
"""
rows = []
skipped_items = 0
total_items = 0
for _, row in df.iterrows():
question_id = row['id']
question = row['question']
filepath = row.get('filepath', '')
# Получаем имя файла без пути
doc_name = Path(filepath).stem if filepath else ''
# Пропускаем, если файл не найден
if not doc_name or doc_name not in item_texts:
if debug_mode and doc_name:
print(f"Документ {doc_name} не найден в извлеченных данных")
continue
# Обрабатываем пункты
puncts = row.get('puncts', [])
if isinstance(puncts, str) and puncts.strip():
# Преобразуем строковое представление в список
try:
puncts = eval(puncts)
except:
puncts = []
if not isinstance(puncts, list):
puncts = []
for punct in puncts:
total_items += 1
if debug_mode:
print(f"\nОбработка пункта {punct} для вопроса {question_id} из {doc_name}")
# Ищем соответствующий пункт в документе
available_keys = list(item_texts[doc_name]['puncts'].keys())
matching_key = find_matching_key(punct, available_keys, 'punct', debug_mode)
if matching_key:
# Сохраняем основной текст пункта
item_text = item_texts[doc_name]['puncts'][matching_key]
# Список всех включенных ключей (для отслеживания что было приконкатенировано)
matched_keys = [matching_key]
# Ищем все подпункты для этого пункта
subpuncts = {}
for key in available_keys:
if is_subpunct(matching_key, key):
subpuncts[key] = item_texts[doc_name]['puncts'][key]
matched_keys.append(key)
# Если есть подпункты, добавляем их к основному тексту
if subpuncts:
# Сортируем подпункты по номеру
sorted_subpuncts = sorted(subpuncts.items(), key=lambda x: x[0])
# Добавляем разделитель и все подпункты
combined_text = item_text
for key, subtext in sorted_subpuncts:
combined_text += f"\n\n{key} {subtext}"
item_text = combined_text
# Добавляем строку с пунктом и его подпунктами
rows.append({
'id': question_id,
'question': question,
'filename': doc_name,
'text': item_text,
'item_type': 'punct',
'item_id': punct,
'matching_keys': ", ".join(matched_keys)
})
if debug_mode:
print(f"Добавлен пункт {matching_key} для {question_id} с {len(matched_keys)} ключами")
if len(matched_keys) > 1:
print(f" Включены ключи: {', '.join(matched_keys)}")
else:
skipped_items += 1
if debug_mode:
print(f"Не найден соответствующий пункт для {punct} в {doc_name}")
# Обрабатываем приложения
appendices = row.get('appendices', [])
if isinstance(appendices, str) and appendices.strip():
# Преобразуем строковое представление в список
try:
appendices = eval(appendices)
except:
appendices = []
if not isinstance(appendices, list):
appendices = []
for appendix in appendices:
total_items += 1
if debug_mode:
print(f"\nОбработка приложения {appendix} для вопроса {question_id} из {doc_name}")
# Ищем соответствующее приложение в документе
available_keys = list(item_texts[doc_name]['appendices'].keys())
matching_key = find_matching_key(appendix, available_keys, 'appendix', debug_mode)
if matching_key:
appendix_content = item_texts[doc_name]['appendices'][matching_key]
# Список всех включенных ключей (для отслеживания что было приконкатенировано)
matched_keys = [matching_key]
# Формируем полный текст приложения, включая все подпункты
if isinstance(appendix_content, dict):
# Начинаем с основного текста
full_text = appendix_content.get('main_text', '')
# Добавляем все подпункты в отсортированном порядке
if 'subpuncts' in appendix_content and appendix_content['subpuncts']:
subpuncts = appendix_content['subpuncts']
sorted_subpuncts = sorted(subpuncts.items(), key=lambda x: x[0])
# Добавляем разделитель, если есть основной текст
if full_text:
full_text += "\n\n"
# Добавляем все подпункты
for i, (key, subtext) in enumerate(sorted_subpuncts):
matched_keys.append(f"{matching_key}/{key}")
if i > 0:
full_text += "\n\n"
full_text += f"{key} {subtext}"
else:
# Если приложение просто строка
full_text = appendix_content
# Добавляем строку с приложением
rows.append({
'id': question_id,
'question': question,
'filename': doc_name,
'text': full_text,
'item_type': 'appendix',
'item_id': appendix,
'matching_keys': ", ".join(matched_keys)
})
if debug_mode:
print(f"Добавлено приложение {matching_key} для {question_id} с {len(matched_keys)} ключами")
if len(matched_keys) > 1:
print(f" Включены ключи: {', '.join(matched_keys)}")
else:
skipped_items += 1
if debug_mode:
print(f"Не найдено соответствующее приложение для {appendix} в {doc_name}")
extended_df = pd.DataFrame(rows)
# Сохраняем расширенный датасет
extended_df.to_excel(output_path, index=False)
print(f"Расширенный датасет сохранен в {output_path}")
print(f"Всего обработано элементов: {total_items}")
print(f"Всего элементов в расширенном датасете: {len(extended_df)}")
print(f"Пропущено элементов из-за отсутствия соответствия: {skipped_items}")
return extended_df
def main():
# Парсим аргументы командной строки
args = parse_args()
# Определяем режим отладки
debug = args.debug
# Загружаем исходный датасет
df = load_dataset(args.input_dataset, debug)
# Читаем документы
documents = read_documents(args.data_folder)
# Извлекаем тексты пунктов и приложений
item_texts = extract_item_texts(documents, debug)
# Подготавливаем расширенный датасет
expanded_df = prepare_expanded_dataset(df, item_texts, args.output_dataset, debug)
print("Готово!")
if __name__ == "__main__":
main()