Spaces:
Sleeping
Sleeping
#!/usr/bin/env python | |
""" | |
Скрипт для анализа ненайденных пунктов по лучшему подходу чанкинга (200 слов, 75 перекрытие, baai/bge-m3, top-100). | |
Формирует отчет в формате Markdown с топ-5 наиболее похожими чанками для каждого ненайденного пункта. | |
""" | |
import argparse | |
import json | |
import os | |
import sys | |
from pathlib import Path | |
import numpy as np | |
import pandas as pd | |
from fuzzywuzzy import fuzz | |
from sklearn.metrics.pairwise import cosine_similarity | |
from tqdm import tqdm | |
# Константы | |
DATA_FOLDER = "data/docs" # Путь к папке с документами | |
MODEL_NAME = "BAAI/bge-m3" # Название лучшей модели | |
DATASET_PATH = "data/dataset.xlsx" # Путь к Excel-датасету с вопросами | |
OUTPUT_DIR = "data" # Директория для сохранения результатов | |
MARKDOWN_FILE = "missing_puncts_analysis.md" # Имя выходного MD-файла | |
SIMILARITY_THRESHOLD = 0.7 # Порог для нечеткого сравнения | |
WORDS_PER_CHUNK = 200 # Размер чанка в словах | |
OVERLAP_WORDS = 75 # Перекрытие в словах | |
TOP_N = 100 # Количество чанков в топе | |
sys.path.insert(0, str(Path(__file__).parent.parent)) | |
def parse_args(): | |
""" | |
Парсит аргументы командной строки. | |
Returns: | |
Аргументы командной строки | |
""" | |
parser = argparse.ArgumentParser(description="Анализ ненайденных пунктов для лучшего подхода чанкинга") | |
parser.add_argument("--data-folder", type=str, default=DATA_FOLDER, | |
help=f"Путь к папке с документами (по умолчанию: {DATA_FOLDER})") | |
parser.add_argument("--model-name", type=str, default=MODEL_NAME, | |
help=f"Название модели (по умолчанию: {MODEL_NAME})") | |
parser.add_argument("--dataset-path", type=str, default=DATASET_PATH, | |
help=f"Путь к Excel-датасету с вопросами (по умолчанию: {DATASET_PATH})") | |
parser.add_argument("--output-dir", type=str, default=OUTPUT_DIR, | |
help=f"Директория для сохранения результатов (по умолчанию: {OUTPUT_DIR})") | |
parser.add_argument("--markdown-file", type=str, default=MARKDOWN_FILE, | |
help=f"Имя выходного MD-файла (по умолчанию: {MARKDOWN_FILE})") | |
parser.add_argument("--similarity-threshold", type=float, default=SIMILARITY_THRESHOLD, | |
help=f"Порог для нечеткого сравнения (по умолчанию: {SIMILARITY_THRESHOLD})") | |
parser.add_argument("--words-per-chunk", type=int, default=WORDS_PER_CHUNK, | |
help=f"Размер чанка в словах (по умолчанию: {WORDS_PER_CHUNK})") | |
parser.add_argument("--overlap-words", type=int, default=OVERLAP_WORDS, | |
help=f"Перекрытие в словах (по умолчанию: {OVERLAP_WORDS})") | |
parser.add_argument("--top-n", type=int, default=TOP_N, | |
help=f"Количество чанков в топе (по умолчанию: {TOP_N})") | |
return parser.parse_args() | |
def load_questions_dataset(file_path: str) -> pd.DataFrame: | |
""" | |
Загружает датасет с вопросами из Excel-файла. | |
Args: | |
file_path: Путь к Excel-файлу | |
Returns: | |
DataFrame с вопросами и пунктами | |
""" | |
print(f"Загрузка датасета из {file_path}...") | |
df = pd.read_excel(file_path) | |
print(f"Загружен датасет со столбцами: {df.columns.tolist()}") | |
# Преобразуем NaN в пустые строки для текстовых полей | |
text_columns = ['question', 'text', 'item_type'] | |
for col in text_columns: | |
if col in df.columns: | |
df[col] = df[col].fillna('') | |
return df | |
def load_chunks_and_embeddings(output_dir: str, words_per_chunk: int, overlap_words: int, model_name: str) -> tuple: | |
""" | |
Загружает чанки и эмбеддинги из файлов. | |
Args: | |
output_dir: Директория с файлами | |
words_per_chunk: Размер чанка в словах | |
overlap_words: Перекрытие в словах | |
model_name: Название модели | |
Returns: | |
Кортеж (чанки, эмбеддинги чанков, эмбеддинги вопросов, данные вопросов) | |
""" | |
# Формируем уникальное имя для файлов на основе параметров | |
model_name_safe = model_name.replace('/', '_') | |
strategy_config_str = f"fixed_size_w{words_per_chunk}_o{overlap_words}" | |
chunks_filename = f"chunks_{strategy_config_str}_{model_name_safe}" | |
questions_filename = f"questions_{model_name_safe}" | |
# Пути к файлам | |
chunks_embeddings_path = os.path.join(output_dir, f"{chunks_filename}_embeddings.npy") | |
chunks_data_path = os.path.join(output_dir, f"{chunks_filename}_data.csv") | |
questions_embeddings_path = os.path.join(output_dir, f"{questions_filename}_embeddings.npy") | |
questions_data_path = os.path.join(output_dir, f"{questions_filename}_data.csv") | |
# Проверяем наличие всех файлов | |
for path in [chunks_embeddings_path, chunks_data_path, questions_embeddings_path, questions_data_path]: | |
if not os.path.exists(path): | |
raise FileNotFoundError(f"Файл {path} не найден") | |
# Загружаем данные | |
print(f"Загрузка данных из {output_dir}...") | |
chunks_embeddings = np.load(chunks_embeddings_path) | |
chunks_df = pd.read_csv(chunks_data_path) | |
questions_embeddings = np.load(questions_embeddings_path) | |
questions_df = pd.read_csv(questions_data_path) | |
print(f"Загружено {len(chunks_df)} чанков и {len(questions_df)} вопросов") | |
return chunks_df, chunks_embeddings, questions_embeddings, questions_df | |
def load_top_chunks(top_chunks_dir: str) -> dict: | |
""" | |
Загружает JSON-файлы с топ-чанками для вопросов. | |
Args: | |
top_chunks_dir: Директория с JSON-файлами | |
Returns: | |
Словарь {question_id: данные из JSON} | |
""" | |
print(f"Загрузка топ-чанков из {top_chunks_dir}...") | |
top_chunks_data = {} | |
json_files = list(Path(top_chunks_dir).glob("question_*_top_chunks.json")) | |
for json_file in tqdm(json_files, desc="Загрузка JSON-файлов"): | |
try: | |
with open(json_file, 'r', encoding='utf-8') as f: | |
data = json.load(f) | |
question_id = data.get('question_id') | |
if question_id is not None: | |
top_chunks_data[question_id] = data | |
except Exception as e: | |
print(f"Ошибка при загрузке файла {json_file}: {e}") | |
print(f"Загружены данные для {len(top_chunks_data)} вопросов") | |
return top_chunks_data | |
def calculate_chunk_overlap(chunk_text: str, punct_text: str) -> float: | |
""" | |
Рассчитывает степень перекрытия между чанком и пунктом с использованием partial_ratio. | |
Args: | |
chunk_text: Текст чанка | |
punct_text: Текст пункта | |
Returns: | |
Коэффициент перекрытия от 0 до 1 | |
""" | |
# Если чанк входит в пункт, возвращаем 1.0 (полное вхождение) | |
if chunk_text in punct_text: | |
return 1.0 | |
# Если пункт входит в чанк, возвращаем соотношение длин | |
if punct_text in chunk_text: | |
return len(punct_text) / len(chunk_text) | |
# Используем partial_ratio из fuzzywuzzy | |
partial_ratio_score = fuzz.partial_ratio(chunk_text, punct_text) / 100.0 | |
return partial_ratio_score | |
def find_most_similar_chunks(punct_text: str, chunks_df: pd.DataFrame, chunks_embeddings: np.ndarray, punct_embedding: np.ndarray, top_n: int = 5) -> list: | |
""" | |
Находит топ-N наиболее похожих чанков для заданного пункта. | |
Args: | |
punct_text: Текст пункта | |
chunks_df: DataFrame с чанками | |
chunks_embeddings: Эмбеддинги чанков | |
punct_embedding: Эмбеддинг пункта | |
top_n: Количество похожих чанков (по умолчанию 5) | |
Returns: | |
Список словарей с информацией о похожих чанках | |
""" | |
# Вычисляем косинусную близость между пунктом и всеми чанками | |
similarities = cosine_similarity([punct_embedding], chunks_embeddings)[0] | |
# Получаем индексы топ-N чанков по косинусной близости | |
top_indices = np.argsort(similarities)[-top_n:][::-1] | |
similar_chunks = [] | |
for idx in top_indices: | |
chunk = chunks_df.iloc[idx] | |
overlap = calculate_chunk_overlap(chunk['text'], punct_text) | |
similar_chunks.append({ | |
'chunk_id': chunk['id'], | |
'doc_name': chunk['doc_name'], | |
'text': chunk['text'], | |
'similarity': float(similarities[idx]), | |
'overlap': overlap | |
}) | |
return similar_chunks | |
def analyze_missing_puncts(questions_df: pd.DataFrame, chunks_df: pd.DataFrame, | |
questions_embeddings: np.ndarray, chunks_embeddings: np.ndarray, | |
similarity_threshold: float, top_n: int = 100) -> dict: | |
""" | |
Анализирует ненайденные пункты и находит для них наиболее похожие чанки. | |
Args: | |
questions_df: DataFrame с вопросами и пунктами | |
chunks_df: DataFrame с чанками | |
questions_embeddings: Эмбеддинги вопросов | |
chunks_embeddings: Эмбеддинги чанков | |
similarity_threshold: Порог для определения найденных пунктов | |
top_n: Количество чанков для проверки (по умолчанию 100) | |
Returns: | |
Словарь с результатами анализа | |
""" | |
print("Анализ ненайденных пунктов...") | |
# Проверяем соответствие количества вопросов и эмбеддингов | |
unique_question_ids = questions_df['id'].unique() | |
if len(unique_question_ids) != questions_embeddings.shape[0]: | |
print(f"ВНИМАНИЕ: Количество уникальных ID вопросов ({len(unique_question_ids)}) не соответствует размеру массива эмбеддингов ({questions_embeddings.shape[0]}).") | |
print("Будут анализироваться только вопросы, имеющие соответствующие эмбеддинги.") | |
# Создаем маппинг id вопроса -> индекс в DataFrame с метаданными | |
# Используем порядковый номер в списке уникальных ID, а не порядок строк в DataFrame | |
question_id_to_idx = {qid: idx for idx, qid in enumerate(unique_question_ids)} | |
# Вычисляем косинусную близость между вопросами и чанками | |
similarity_matrix = cosine_similarity(questions_embeddings, chunks_embeddings) | |
# Результаты анализа | |
analysis_results = {} | |
# Обрабатываем только те вопросы, для которых у нас есть эмбеддинги | |
valid_question_ids = [qid for qid in unique_question_ids if qid in question_id_to_idx and question_id_to_idx[qid] < len(questions_embeddings)] | |
# Группируем датасет по id вопроса | |
for question_id in tqdm(valid_question_ids, desc="Анализ вопросов"): | |
# Получаем строки для текущего вопроса | |
question_rows = questions_df[questions_df['id'] == question_id] | |
# Если нет строк с таким id, пропускаем | |
if len(question_rows) == 0: | |
continue | |
# Получаем индекс вопроса в массиве эмбеддингов | |
question_idx = question_id_to_idx[question_id] | |
# Если индекс выходит за границы массива эмбеддингов, пропускаем | |
if question_idx >= questions_embeddings.shape[0]: | |
print(f"ВНИМАНИЕ: Индекс {question_idx} для вопроса {question_id} выходит за границы массива эмбеддингов размера {questions_embeddings.shape[0]}. Пропускаем.") | |
continue | |
# Получаем текст вопроса и пункты | |
question_text = question_rows['question'].iloc[0] | |
# Собираем пункты с информацией о документе | |
puncts = [] | |
for _, row in question_rows.iterrows(): | |
punct_doc = row.get('filename', '') if 'filename' in row else '' | |
if pd.isna(punct_doc): | |
punct_doc = '' | |
puncts.append({ | |
'text': row['text'], | |
'doc_name': punct_doc | |
}) | |
# Получаем связанные документы | |
relevant_docs = [] | |
if 'filename' in question_rows.columns: | |
relevant_docs = [f for f in question_rows['filename'].unique() if f and not pd.isna(f)] | |
else: | |
relevant_docs = chunks_df['doc_name'].unique().tolist() | |
# Если для вопроса нет релевантных документов, пропускаем | |
if not relevant_docs: | |
continue | |
# Для отслеживания найденных и ненайденных пунктов | |
found_puncts = [] | |
missing_puncts = [] | |
# Собираем все чанки для документов вопроса | |
all_question_chunks = [] | |
all_question_similarities = [] | |
for filename in relevant_docs: | |
if not filename or pd.isna(filename): | |
continue | |
# Фильтруем чанки по имени файла | |
doc_chunks = chunks_df[chunks_df['doc_name'] == filename] | |
if doc_chunks.empty: | |
continue | |
# Индексы чанков для текущего файла | |
doc_chunk_indices = doc_chunks.index.tolist() | |
# Проверяем, что индексы чанков существуют в chunks_df | |
valid_indices = [idx for idx in doc_chunk_indices if idx in chunks_df.index] | |
# Получаем значения близости для чанков текущего файла | |
doc_similarities = [] | |
for idx in valid_indices: | |
try: | |
chunk_loc = chunks_df.index.get_loc(idx) | |
doc_similarities.append(similarity_matrix[question_idx, chunk_loc]) | |
except (KeyError, IndexError) as e: | |
print(f"Ошибка при получении индекса для чанка {idx}: {e}") | |
continue | |
# Добавляем чанки и их схожести к общему списку для вопроса | |
for i, idx in enumerate(valid_indices): | |
if i < len(doc_similarities): # проверяем, что у нас есть соответствующее значение similarity | |
try: | |
chunk_row = doc_chunks.loc[idx] | |
all_question_chunks.append((idx, chunk_row)) | |
all_question_similarities.append(doc_similarities[i]) | |
except KeyError as e: | |
print(f"Ошибка при доступе к строке с индексом {idx}: {e}") | |
# Если нет чанков для вопроса, пропускаем | |
if not all_question_chunks: | |
continue | |
# Сортируем все чанки по убыванию схожести и берем top_n | |
sorted_indices = np.argsort(all_question_similarities)[-min(top_n, len(all_question_similarities)):][::-1] | |
top_chunks = [] | |
top_similarities = [] | |
# Собираем топ-N чанков и их схожести | |
for i in sorted_indices: | |
idx, chunk = all_question_chunks[i] | |
top_chunks.append({ | |
'id': chunk['id'], | |
'doc_name': chunk['doc_name'], | |
'text': chunk['text'] | |
}) | |
top_similarities.append(all_question_similarities[i]) | |
# Проверяем каждый пункт на наличие в топ-чанках | |
for i, punct in enumerate(puncts): | |
is_found = False | |
punct_text = punct['text'] | |
punct_doc = punct['doc_name'] | |
# Для каждого чанка из топ-N рассчитываем partial_ratio с пунктом | |
chunk_overlaps = [] | |
for j, chunk in enumerate(top_chunks): | |
overlap = calculate_chunk_overlap(chunk['text'], punct_text) | |
# Если перекрытие больше порога, пункт найден | |
if overlap >= similarity_threshold: | |
is_found = True | |
# Сохраняем информацию о перекрытии для каждого чанка | |
chunk_overlaps.append({ | |
'chunk_id': chunk['id'], | |
'doc_name': chunk['doc_name'], | |
'text': chunk['text'], | |
'overlap': overlap, | |
'similarity': float(top_similarities[j]) | |
}) | |
# Если пункт найден, добавляем в список найденных | |
if is_found: | |
found_puncts.append({ | |
'index': i, | |
'text': punct_text, | |
'doc_name': punct_doc | |
}) | |
else: | |
# Сортируем чанки по убыванию перекрытия с пунктом и берем топ-5 | |
chunk_overlaps.sort(key=lambda x: x['overlap'], reverse=True) | |
top_overlaps = chunk_overlaps[:5] | |
missing_puncts.append({ | |
'index': i, | |
'text': punct_text, | |
'doc_name': punct_doc, | |
'similar_chunks': top_overlaps | |
}) | |
# Добавляем результаты для текущего вопроса | |
analysis_results[question_id] = { | |
'question_id': question_id, | |
'question_text': question_text, | |
'found_puncts_count': len(found_puncts), | |
'missing_puncts_count': len(missing_puncts), | |
'total_puncts_count': len(puncts), | |
'found_puncts': found_puncts, | |
'missing_puncts': missing_puncts | |
} | |
return analysis_results | |
def generate_markdown_report(analysis_results: dict, output_file: str, | |
words_per_chunk: int, overlap_words: int, model_name: str, top_n: int): | |
""" | |
Генерирует отчет в формате Markdown. | |
Args: | |
analysis_results: Результаты анализа | |
output_file: Путь к выходному файлу | |
words_per_chunk: Размер чанка в словах | |
overlap_words: Перекрытие в словах | |
model_name: Название модели | |
top_n: Количество чанков в топе | |
""" | |
print(f"Генерация отчета в формате Markdown в {output_file}...") | |
with open(output_file, 'w', encoding='utf-8') as f: | |
# Заголовок отчета | |
f.write(f"# Анализ ненайденных пунктов для оптимальной конфигурации чанкинга\n\n") | |
# Параметры анализа | |
f.write("## Параметры анализа\n\n") | |
f.write(f"- **Модель**: {model_name}\n") | |
f.write(f"- **Размер чанка**: {words_per_chunk} слов\n") | |
f.write(f"- **Перекрытие**: {overlap_words} слов ({round(overlap_words/words_per_chunk*100, 1)}%)\n") | |
f.write(f"- **Количество чанков в топе**: {top_n}\n\n") | |
# Сводная статистика | |
total_questions = len(analysis_results) | |
total_puncts = sum(q['total_puncts_count'] for q in analysis_results.values()) | |
total_found = sum(q['found_puncts_count'] for q in analysis_results.values()) | |
total_missing = sum(q['missing_puncts_count'] for q in analysis_results.values()) | |
f.write("## Сводная статистика\n\n") | |
f.write(f"- **Всего вопросов**: {total_questions}\n") | |
f.write(f"- **Всего пунктов**: {total_puncts}\n") | |
f.write(f"- **Найдено пунктов**: {total_found} ({round(total_found/total_puncts*100, 1)}%)\n") | |
f.write(f"- **Ненайдено пунктов**: {total_missing} ({round(total_missing/total_puncts*100, 1)}%)\n\n") | |
# Детали по каждому вопросу | |
f.write("## Детальный анализ по вопросам\n\n") | |
# Сортируем вопросы по количеству ненайденных пунктов (по убыванию) | |
sorted_questions = sorted( | |
analysis_results.values(), | |
key=lambda x: x['missing_puncts_count'], | |
reverse=True | |
) | |
for question_data in sorted_questions: | |
question_id = question_data['question_id'] | |
question_text = question_data['question_text'] | |
missing_count = question_data['missing_puncts_count'] | |
total_count = question_data['total_puncts_count'] | |
# Если нет ненайденных пунктов, пропускаем | |
if missing_count == 0: | |
continue | |
f.write(f"### Вопрос {question_id}\n\n") | |
f.write(f"**Текст вопроса**: {question_text}\n\n") | |
f.write(f"**Статистика**: найдено {question_data['found_puncts_count']} из {total_count} пунктов ") | |
f.write(f"({round(question_data['found_puncts_count']/total_count*100, 1)}%)\n\n") | |
# Детали по ненайденным пунктам | |
f.write("#### Ненайденные пункты\n\n") | |
for i, punct in enumerate(question_data['missing_puncts']): | |
punct_text = punct['text'] | |
punct_doc = punct.get('doc_name', '') | |
similar_chunks = punct['similar_chunks'] | |
f.write(f"##### Пункт {i+1}\n\n") | |
f.write(f"**Текст пункта**: {punct_text}\n\n") | |
if punct_doc: | |
f.write(f"**Документ пункта**: {punct_doc}\n\n") | |
f.write("**Топ-5 наиболее похожих чанков**:\n\n") | |
# Таблица с похожими чанками | |
f.write("| № | Документ | Схожесть (с вопросом) | Перекрытие (с пунктом) | Текст чанка |\n") | |
f.write("|---|----------|----------|------------|------------|\n") | |
for j, chunk in enumerate(similar_chunks): | |
# Используем полный текст чанка без обрезки | |
chunk_text = chunk['text'] | |
f.write(f"| {j+1} | {chunk['doc_name']} | {chunk['similarity']:.4f} | ") | |
f.write(f"{chunk['overlap']:.4f} | {chunk_text} |\n") | |
f.write("\n") | |
f.write("\n") | |
print(f"Отчет успешно сгенерирован: {output_file}") | |
def main(): | |
""" | |
Основная функция скрипта. | |
""" | |
args = parse_args() | |
# Загружаем датасет с вопросами | |
questions_df = load_questions_dataset(args.dataset_path) | |
# Загружаем чанки и эмбеддинги | |
chunks_df, chunks_embeddings, questions_embeddings, questions_meta = load_chunks_and_embeddings( | |
args.output_dir, args.words_per_chunk, args.overlap_words, args.model_name | |
) | |
# Анализируем ненайденные пункты | |
analysis_results = analyze_missing_puncts( | |
questions_df, chunks_df, questions_embeddings, chunks_embeddings, | |
args.similarity_threshold, args.top_n | |
) | |
# Генерируем отчет в формате Markdown | |
output_file = os.path.join(args.output_dir, args.markdown_file) | |
generate_markdown_report( | |
analysis_results, output_file, | |
args.words_per_chunk, args.overlap_words, args.model_name, args.top_n | |
) | |
print(f"Анализ ненайденных пунктов завершен. Результаты сохранены в {output_file}") | |
if __name__ == "__main__": | |
main() |