#!/usr/bin/env python """ Скрипт для анализа ненайденных пунктов по лучшему подходу чанкинга (200 слов, 75 перекрытие, baai/bge-m3, top-100). Формирует отчет в формате Markdown с топ-5 наиболее похожими чанками для каждого ненайденного пункта. """ import argparse import json import os import sys from pathlib import Path import numpy as np import pandas as pd from fuzzywuzzy import fuzz from sklearn.metrics.pairwise import cosine_similarity from tqdm import tqdm # Константы DATA_FOLDER = "data/docs" # Путь к папке с документами MODEL_NAME = "BAAI/bge-m3" # Название лучшей модели DATASET_PATH = "data/dataset.xlsx" # Путь к Excel-датасету с вопросами OUTPUT_DIR = "data" # Директория для сохранения результатов MARKDOWN_FILE = "missing_puncts_analysis.md" # Имя выходного MD-файла SIMILARITY_THRESHOLD = 0.7 # Порог для нечеткого сравнения WORDS_PER_CHUNK = 200 # Размер чанка в словах OVERLAP_WORDS = 75 # Перекрытие в словах TOP_N = 100 # Количество чанков в топе sys.path.insert(0, str(Path(__file__).parent.parent)) def parse_args(): """ Парсит аргументы командной строки. Returns: Аргументы командной строки """ parser = argparse.ArgumentParser(description="Анализ ненайденных пунктов для лучшего подхода чанкинга") parser.add_argument("--data-folder", type=str, default=DATA_FOLDER, help=f"Путь к папке с документами (по умолчанию: {DATA_FOLDER})") parser.add_argument("--model-name", type=str, default=MODEL_NAME, help=f"Название модели (по умолчанию: {MODEL_NAME})") parser.add_argument("--dataset-path", type=str, default=DATASET_PATH, help=f"Путь к Excel-датасету с вопросами (по умолчанию: {DATASET_PATH})") parser.add_argument("--output-dir", type=str, default=OUTPUT_DIR, help=f"Директория для сохранения результатов (по умолчанию: {OUTPUT_DIR})") parser.add_argument("--markdown-file", type=str, default=MARKDOWN_FILE, help=f"Имя выходного MD-файла (по умолчанию: {MARKDOWN_FILE})") parser.add_argument("--similarity-threshold", type=float, default=SIMILARITY_THRESHOLD, help=f"Порог для нечеткого сравнения (по умолчанию: {SIMILARITY_THRESHOLD})") parser.add_argument("--words-per-chunk", type=int, default=WORDS_PER_CHUNK, help=f"Размер чанка в словах (по умолчанию: {WORDS_PER_CHUNK})") parser.add_argument("--overlap-words", type=int, default=OVERLAP_WORDS, help=f"Перекрытие в словах (по умолчанию: {OVERLAP_WORDS})") parser.add_argument("--top-n", type=int, default=TOP_N, help=f"Количество чанков в топе (по умолчанию: {TOP_N})") return parser.parse_args() def load_questions_dataset(file_path: str) -> pd.DataFrame: """ Загружает датасет с вопросами из Excel-файла. Args: file_path: Путь к Excel-файлу Returns: DataFrame с вопросами и пунктами """ print(f"Загрузка датасета из {file_path}...") df = pd.read_excel(file_path) print(f"Загружен датасет со столбцами: {df.columns.tolist()}") # Преобразуем NaN в пустые строки для текстовых полей text_columns = ['question', 'text', 'item_type'] for col in text_columns: if col in df.columns: df[col] = df[col].fillna('') return df def load_chunks_and_embeddings(output_dir: str, words_per_chunk: int, overlap_words: int, model_name: str) -> tuple: """ Загружает чанки и эмбеддинги из файлов. Args: output_dir: Директория с файлами words_per_chunk: Размер чанка в словах overlap_words: Перекрытие в словах model_name: Название модели Returns: Кортеж (чанки, эмбеддинги чанков, эмбеддинги вопросов, данные вопросов) """ # Формируем уникальное имя для файлов на основе параметров model_name_safe = model_name.replace('/', '_') strategy_config_str = f"fixed_size_w{words_per_chunk}_o{overlap_words}" chunks_filename = f"chunks_{strategy_config_str}_{model_name_safe}" questions_filename = f"questions_{model_name_safe}" # Пути к файлам chunks_embeddings_path = os.path.join(output_dir, f"{chunks_filename}_embeddings.npy") chunks_data_path = os.path.join(output_dir, f"{chunks_filename}_data.csv") questions_embeddings_path = os.path.join(output_dir, f"{questions_filename}_embeddings.npy") questions_data_path = os.path.join(output_dir, f"{questions_filename}_data.csv") # Проверяем наличие всех файлов for path in [chunks_embeddings_path, chunks_data_path, questions_embeddings_path, questions_data_path]: if not os.path.exists(path): raise FileNotFoundError(f"Файл {path} не найден") # Загружаем данные print(f"Загрузка данных из {output_dir}...") chunks_embeddings = np.load(chunks_embeddings_path) chunks_df = pd.read_csv(chunks_data_path) questions_embeddings = np.load(questions_embeddings_path) questions_df = pd.read_csv(questions_data_path) print(f"Загружено {len(chunks_df)} чанков и {len(questions_df)} вопросов") return chunks_df, chunks_embeddings, questions_embeddings, questions_df def load_top_chunks(top_chunks_dir: str) -> dict: """ Загружает JSON-файлы с топ-чанками для вопросов. Args: top_chunks_dir: Директория с JSON-файлами Returns: Словарь {question_id: данные из JSON} """ print(f"Загрузка топ-чанков из {top_chunks_dir}...") top_chunks_data = {} json_files = list(Path(top_chunks_dir).glob("question_*_top_chunks.json")) for json_file in tqdm(json_files, desc="Загрузка JSON-файлов"): try: with open(json_file, 'r', encoding='utf-8') as f: data = json.load(f) question_id = data.get('question_id') if question_id is not None: top_chunks_data[question_id] = data except Exception as e: print(f"Ошибка при загрузке файла {json_file}: {e}") print(f"Загружены данные для {len(top_chunks_data)} вопросов") return top_chunks_data def calculate_chunk_overlap(chunk_text: str, punct_text: str) -> float: """ Рассчитывает степень перекрытия между чанком и пунктом с использованием partial_ratio. Args: chunk_text: Текст чанка punct_text: Текст пункта Returns: Коэффициент перекрытия от 0 до 1 """ # Если чанк входит в пункт, возвращаем 1.0 (полное вхождение) if chunk_text in punct_text: return 1.0 # Если пункт входит в чанк, возвращаем соотношение длин if punct_text in chunk_text: return len(punct_text) / len(chunk_text) # Используем partial_ratio из fuzzywuzzy partial_ratio_score = fuzz.partial_ratio(chunk_text, punct_text) / 100.0 return partial_ratio_score def find_most_similar_chunks(punct_text: str, chunks_df: pd.DataFrame, chunks_embeddings: np.ndarray, punct_embedding: np.ndarray, top_n: int = 5) -> list: """ Находит топ-N наиболее похожих чанков для заданного пункта. Args: punct_text: Текст пункта chunks_df: DataFrame с чанками chunks_embeddings: Эмбеддинги чанков punct_embedding: Эмбеддинг пункта top_n: Количество похожих чанков (по умолчанию 5) Returns: Список словарей с информацией о похожих чанках """ # Вычисляем косинусную близость между пунктом и всеми чанками similarities = cosine_similarity([punct_embedding], chunks_embeddings)[0] # Получаем индексы топ-N чанков по косинусной близости top_indices = np.argsort(similarities)[-top_n:][::-1] similar_chunks = [] for idx in top_indices: chunk = chunks_df.iloc[idx] overlap = calculate_chunk_overlap(chunk['text'], punct_text) similar_chunks.append({ 'chunk_id': chunk['id'], 'doc_name': chunk['doc_name'], 'text': chunk['text'], 'similarity': float(similarities[idx]), 'overlap': overlap }) return similar_chunks def analyze_missing_puncts(questions_df: pd.DataFrame, chunks_df: pd.DataFrame, questions_embeddings: np.ndarray, chunks_embeddings: np.ndarray, similarity_threshold: float, top_n: int = 100) -> dict: """ Анализирует ненайденные пункты и находит для них наиболее похожие чанки. Args: questions_df: DataFrame с вопросами и пунктами chunks_df: DataFrame с чанками questions_embeddings: Эмбеддинги вопросов chunks_embeddings: Эмбеддинги чанков similarity_threshold: Порог для определения найденных пунктов top_n: Количество чанков для проверки (по умолчанию 100) Returns: Словарь с результатами анализа """ print("Анализ ненайденных пунктов...") # Проверяем соответствие количества вопросов и эмбеддингов unique_question_ids = questions_df['id'].unique() if len(unique_question_ids) != questions_embeddings.shape[0]: print(f"ВНИМАНИЕ: Количество уникальных ID вопросов ({len(unique_question_ids)}) не соответствует размеру массива эмбеддингов ({questions_embeddings.shape[0]}).") print("Будут анализироваться только вопросы, имеющие соответствующие эмбеддинги.") # Создаем маппинг id вопроса -> индекс в DataFrame с метаданными # Используем порядковый номер в списке уникальных ID, а не порядок строк в DataFrame question_id_to_idx = {qid: idx for idx, qid in enumerate(unique_question_ids)} # Вычисляем косинусную близость между вопросами и чанками similarity_matrix = cosine_similarity(questions_embeddings, chunks_embeddings) # Результаты анализа analysis_results = {} # Обрабатываем только те вопросы, для которых у нас есть эмбеддинги valid_question_ids = [qid for qid in unique_question_ids if qid in question_id_to_idx and question_id_to_idx[qid] < len(questions_embeddings)] # Группируем датасет по id вопроса for question_id in tqdm(valid_question_ids, desc="Анализ вопросов"): # Получаем строки для текущего вопроса question_rows = questions_df[questions_df['id'] == question_id] # Если нет строк с таким id, пропускаем if len(question_rows) == 0: continue # Получаем индекс вопроса в массиве эмбеддингов question_idx = question_id_to_idx[question_id] # Если индекс выходит за границы массива эмбеддингов, пропускаем if question_idx >= questions_embeddings.shape[0]: print(f"ВНИМАНИЕ: Индекс {question_idx} для вопроса {question_id} выходит за границы массива эмбеддингов размера {questions_embeddings.shape[0]}. Пропускаем.") continue # Получаем текст вопроса и пункты question_text = question_rows['question'].iloc[0] # Собираем пункты с информацией о документе puncts = [] for _, row in question_rows.iterrows(): punct_doc = row.get('filename', '') if 'filename' in row else '' if pd.isna(punct_doc): punct_doc = '' puncts.append({ 'text': row['text'], 'doc_name': punct_doc }) # Получаем связанные документы relevant_docs = [] if 'filename' in question_rows.columns: relevant_docs = [f for f in question_rows['filename'].unique() if f and not pd.isna(f)] else: relevant_docs = chunks_df['doc_name'].unique().tolist() # Если для вопроса нет релевантных документов, пропускаем if not relevant_docs: continue # Для отслеживания найденных и ненайденных пунктов found_puncts = [] missing_puncts = [] # Собираем все чанки для документов вопроса all_question_chunks = [] all_question_similarities = [] for filename in relevant_docs: if not filename or pd.isna(filename): continue # Фильтруем чанки по имени файла doc_chunks = chunks_df[chunks_df['doc_name'] == filename] if doc_chunks.empty: continue # Индексы чанков для текущего файла doc_chunk_indices = doc_chunks.index.tolist() # Проверяем, что индексы чанков существуют в chunks_df valid_indices = [idx for idx in doc_chunk_indices if idx in chunks_df.index] # Получаем значения близости для чанков текущего файла doc_similarities = [] for idx in valid_indices: try: chunk_loc = chunks_df.index.get_loc(idx) doc_similarities.append(similarity_matrix[question_idx, chunk_loc]) except (KeyError, IndexError) as e: print(f"Ошибка при получении индекса для чанка {idx}: {e}") continue # Добавляем чанки и их схожести к общему списку для вопроса for i, idx in enumerate(valid_indices): if i < len(doc_similarities): # проверяем, что у нас есть соответствующее значение similarity try: chunk_row = doc_chunks.loc[idx] all_question_chunks.append((idx, chunk_row)) all_question_similarities.append(doc_similarities[i]) except KeyError as e: print(f"Ошибка при доступе к строке с индексом {idx}: {e}") # Если нет чанков для вопроса, пропускаем if not all_question_chunks: continue # Сортируем все чанки по убыванию схожести и берем top_n sorted_indices = np.argsort(all_question_similarities)[-min(top_n, len(all_question_similarities)):][::-1] top_chunks = [] top_similarities = [] # Собираем топ-N чанков и их схожести for i in sorted_indices: idx, chunk = all_question_chunks[i] top_chunks.append({ 'id': chunk['id'], 'doc_name': chunk['doc_name'], 'text': chunk['text'] }) top_similarities.append(all_question_similarities[i]) # Проверяем каждый пункт на наличие в топ-чанках for i, punct in enumerate(puncts): is_found = False punct_text = punct['text'] punct_doc = punct['doc_name'] # Для каждого чанка из топ-N рассчитываем partial_ratio с пунктом chunk_overlaps = [] for j, chunk in enumerate(top_chunks): overlap = calculate_chunk_overlap(chunk['text'], punct_text) # Если перекрытие больше порога, пункт найден if overlap >= similarity_threshold: is_found = True # Сохраняем информацию о перекрытии для каждого чанка chunk_overlaps.append({ 'chunk_id': chunk['id'], 'doc_name': chunk['doc_name'], 'text': chunk['text'], 'overlap': overlap, 'similarity': float(top_similarities[j]) }) # Если пункт найден, добавляем в список найденных if is_found: found_puncts.append({ 'index': i, 'text': punct_text, 'doc_name': punct_doc }) else: # Сортируем чанки по убыванию перекрытия с пунктом и берем топ-5 chunk_overlaps.sort(key=lambda x: x['overlap'], reverse=True) top_overlaps = chunk_overlaps[:5] missing_puncts.append({ 'index': i, 'text': punct_text, 'doc_name': punct_doc, 'similar_chunks': top_overlaps }) # Добавляем результаты для текущего вопроса analysis_results[question_id] = { 'question_id': question_id, 'question_text': question_text, 'found_puncts_count': len(found_puncts), 'missing_puncts_count': len(missing_puncts), 'total_puncts_count': len(puncts), 'found_puncts': found_puncts, 'missing_puncts': missing_puncts } return analysis_results def generate_markdown_report(analysis_results: dict, output_file: str, words_per_chunk: int, overlap_words: int, model_name: str, top_n: int): """ Генерирует отчет в формате Markdown. Args: analysis_results: Результаты анализа output_file: Путь к выходному файлу words_per_chunk: Размер чанка в словах overlap_words: Перекрытие в словах model_name: Название модели top_n: Количество чанков в топе """ print(f"Генерация отчета в формате Markdown в {output_file}...") with open(output_file, 'w', encoding='utf-8') as f: # Заголовок отчета f.write(f"# Анализ ненайденных пунктов для оптимальной конфигурации чанкинга\n\n") # Параметры анализа f.write("## Параметры анализа\n\n") f.write(f"- **Модель**: {model_name}\n") f.write(f"- **Размер чанка**: {words_per_chunk} слов\n") f.write(f"- **Перекрытие**: {overlap_words} слов ({round(overlap_words/words_per_chunk*100, 1)}%)\n") f.write(f"- **Количество чанков в топе**: {top_n}\n\n") # Сводная статистика total_questions = len(analysis_results) total_puncts = sum(q['total_puncts_count'] for q in analysis_results.values()) total_found = sum(q['found_puncts_count'] for q in analysis_results.values()) total_missing = sum(q['missing_puncts_count'] for q in analysis_results.values()) f.write("## Сводная статистика\n\n") f.write(f"- **Всего вопросов**: {total_questions}\n") f.write(f"- **Всего пунктов**: {total_puncts}\n") f.write(f"- **Найдено пунктов**: {total_found} ({round(total_found/total_puncts*100, 1)}%)\n") f.write(f"- **Ненайдено пунктов**: {total_missing} ({round(total_missing/total_puncts*100, 1)}%)\n\n") # Детали по каждому вопросу f.write("## Детальный анализ по вопросам\n\n") # Сортируем вопросы по количеству ненайденных пунктов (по убыванию) sorted_questions = sorted( analysis_results.values(), key=lambda x: x['missing_puncts_count'], reverse=True ) for question_data in sorted_questions: question_id = question_data['question_id'] question_text = question_data['question_text'] missing_count = question_data['missing_puncts_count'] total_count = question_data['total_puncts_count'] # Если нет ненайденных пунктов, пропускаем if missing_count == 0: continue f.write(f"### Вопрос {question_id}\n\n") f.write(f"**Текст вопроса**: {question_text}\n\n") f.write(f"**Статистика**: найдено {question_data['found_puncts_count']} из {total_count} пунктов ") f.write(f"({round(question_data['found_puncts_count']/total_count*100, 1)}%)\n\n") # Детали по ненайденным пунктам f.write("#### Ненайденные пункты\n\n") for i, punct in enumerate(question_data['missing_puncts']): punct_text = punct['text'] punct_doc = punct.get('doc_name', '') similar_chunks = punct['similar_chunks'] f.write(f"##### Пункт {i+1}\n\n") f.write(f"**Текст пункта**: {punct_text}\n\n") if punct_doc: f.write(f"**Документ пункта**: {punct_doc}\n\n") f.write("**Топ-5 наиболее похожих чанков**:\n\n") # Таблица с похожими чанками f.write("| № | Документ | Схожесть (с вопросом) | Перекрытие (с пунктом) | Текст чанка |\n") f.write("|---|----------|----------|------------|------------|\n") for j, chunk in enumerate(similar_chunks): # Используем полный текст чанка без обрезки chunk_text = chunk['text'] f.write(f"| {j+1} | {chunk['doc_name']} | {chunk['similarity']:.4f} | ") f.write(f"{chunk['overlap']:.4f} | {chunk_text} |\n") f.write("\n") f.write("\n") print(f"Отчет успешно сгенерирован: {output_file}") def main(): """ Основная функция скрипта. """ args = parse_args() # Загружаем датасет с вопросами questions_df = load_questions_dataset(args.dataset_path) # Загружаем чанки и эмбеддинги chunks_df, chunks_embeddings, questions_embeddings, questions_meta = load_chunks_and_embeddings( args.output_dir, args.words_per_chunk, args.overlap_words, args.model_name ) # Анализируем ненайденные пункты analysis_results = analyze_missing_puncts( questions_df, chunks_df, questions_embeddings, chunks_embeddings, args.similarity_threshold, args.top_n ) # Генерируем отчет в формате Markdown output_file = os.path.join(args.output_dir, args.markdown_file) generate_markdown_report( analysis_results, output_file, args.words_per_chunk, args.overlap_words, args.model_name, args.top_n ) print(f"Анализ ненайденных пунктов завершен. Результаты сохранены в {output_file}") if __name__ == "__main__": main()