Spaces:
Sleeping
Sleeping
#!/usr/bin/env python | |
""" | |
Скрипт для отладки и анализа чанков, найденных для конкретного вопроса. | |
Показывает, какие чанки находятся, какие пункты ожидаются и значения метрик нечеткого сравнения. | |
""" | |
import argparse | |
import json | |
import os | |
import sys | |
from difflib import SequenceMatcher | |
from pathlib import Path | |
import numpy as np | |
import pandas as pd | |
from sklearn.metrics.pairwise import cosine_similarity | |
sys.path.insert(0, str(Path(__file__).parent.parent)) | |
# Константы для настройки | |
DATA_FOLDER = "data/docs" # Путь к папке с документами | |
MODEL_NAME = "intfloat/e5-base" # Название модели для векторизации | |
DATASET_PATH = "data/dataset.xlsx" # Путь к Excel-датасету с вопросами | |
OUTPUT_DIR = "data" # Директория для сохранения результатов | |
TOP_N_VALUES = [5, 10, 20, 30, 50, 100] # Значения N для анализа | |
THRESHOLD = 0.6 | |
def parse_args(): | |
""" | |
Парсит аргументы командной строки. | |
Returns: | |
Аргументы командной строки | |
""" | |
parser = argparse.ArgumentParser(description="Скрипт для отладки чанкинга на конкретном вопросе") | |
parser.add_argument("--data-folder", type=str, default=DATA_FOLDER, | |
help=f"Путь к папке с документами (по умолчанию: {DATA_FOLDER})") | |
parser.add_argument("--model-name", type=str, default=MODEL_NAME, | |
help=f"Название модели для векторизации (по умолчанию: {MODEL_NAME})") | |
parser.add_argument("--dataset-path", type=str, default=DATASET_PATH, | |
help=f"Путь к Excel-датасету с вопросами (по умолчанию: {DATASET_PATH})") | |
parser.add_argument("--output-dir", type=str, default=OUTPUT_DIR, | |
help=f"Директория для сохранения результатов (по умолчанию: {OUTPUT_DIR})") | |
parser.add_argument("--question-id", type=int, required=True, | |
help="ID вопроса для отладки") | |
parser.add_argument("--top-n", type=int, default=20, | |
help="Количество чанков в топе для отладки (по умолчанию: 20)") | |
parser.add_argument("--words-per-chunk", type=int, default=50, | |
help="Количество слов в чанке для fixed_size стратегии (по умолчанию: 50)") | |
parser.add_argument("--overlap-words", type=int, default=25, | |
help="Количество слов перекрытия для fixed_size стратегии (по умолчанию: 25)") | |
return parser.parse_args() | |
def load_questions_dataset(file_path: str) -> pd.DataFrame: | |
""" | |
Загружает датасет с вопросами из Excel-файла. | |
Args: | |
file_path: Путь к Excel-файлу | |
Returns: | |
DataFrame с вопросами и пунктами | |
""" | |
print(f"Загрузка датасета из {file_path}...") | |
df = pd.read_excel(file_path) | |
print(f"Загружен датасет со столбцами: {df.columns.tolist()}") | |
# Преобразуем NaN в пустые строки для текстовых полей | |
text_columns = ['question', 'text', 'item_type'] | |
for col in text_columns: | |
if col in df.columns: | |
df[col] = df[col].fillna('') | |
return df | |
def load_embeddings_and_data(filename: str, output_dir: str) -> tuple[np.ndarray | None, pd.DataFrame | None]: | |
""" | |
Загружает эмбеддинги и соответствующие данные из файлов. | |
Args: | |
filename: Базовое имя файла | |
output_dir: Директория, где хранятся файлы | |
Returns: | |
Кортеж (эмбеддинги, данные) или (None, None), если файлы не найдены | |
""" | |
embeddings_path = os.path.join(output_dir, f"{filename}_embeddings.npy") | |
data_path = os.path.join(output_dir, f"{filename}_data.csv") | |
if os.path.exists(embeddings_path) and os.path.exists(data_path): | |
print(f"Загрузка данных из {embeddings_path} и {data_path}...") | |
embeddings = np.load(embeddings_path) | |
data = pd.read_csv(data_path) | |
return embeddings, data | |
print(f"Ошибка: файлы {embeddings_path} и {data_path} не найдены.") | |
print("Сначала запустите скрипт evaluate_chunking.py для создания эмбеддингов.") | |
sys.exit(1) | |
def calculate_chunk_overlap(chunk_text: str, punct_text: str) -> float: | |
""" | |
Рассчитывает степень перекрытия между чанком и пунктом. | |
Args: | |
chunk_text: Текст чанка | |
punct_text: Текст пункта | |
Returns: | |
Коэффициент перекрытия от 0 до 1 | |
""" | |
# Если чанк входит в пункт, возвращаем 1.0 (полное вхождение) | |
if chunk_text in punct_text: | |
return 1.0 | |
# Если пункт входит в чанк, возвращаем соотношение длин | |
if punct_text in chunk_text: | |
return len(punct_text) / len(chunk_text) | |
# Используем SequenceMatcher для нечеткого сравнения | |
matcher = SequenceMatcher(None, chunk_text, punct_text) | |
# Находим наибольшую общую подстроку | |
match = matcher.find_longest_match(0, len(chunk_text), 0, len(punct_text)) | |
# Если совпадений нет | |
if match.size == 0: | |
return 0.0 | |
# Возвращаем соотношение длины совпадения к минимальной длине | |
return match.size / min(len(chunk_text), len(punct_text)) | |
def format_text_for_display(text: str, max_length: int = 100) -> str: | |
""" | |
Форматирует текст для отображения, обрезая его при необходимости. | |
Args: | |
text: Исходный текст | |
max_length: Максимальная длина для отображения | |
Returns: | |
Отформатированный текст | |
""" | |
if len(text) <= max_length: | |
return text | |
return text[:max_length] + "..." | |
def analyze_question( | |
question_id: int, | |
questions_df: pd.DataFrame, | |
chunks_df: pd.DataFrame, | |
question_embeddings: np.ndarray, | |
chunk_embeddings: np.ndarray, | |
question_id_to_idx: dict, | |
top_n: int | |
) -> dict: | |
""" | |
Анализирует конкретный вопрос и его релевантные чанки. | |
Args: | |
question_id: ID вопроса для анализа | |
questions_df: DataFrame с вопросами | |
chunks_df: DataFrame с чанками | |
question_embeddings: Эмбеддинги вопросов | |
chunk_embeddings: Эмбеддинги чанков | |
question_id_to_idx: Словарь соответствия ID вопроса и его индекса | |
top_n: Количество чанков в топе | |
Returns: | |
Словарь с результатами анализа | |
""" | |
# Проверяем, есть ли вопрос с таким ID | |
if question_id not in question_id_to_idx: | |
print(f"Ошибка: вопрос с ID {question_id} не найден в данных") | |
sys.exit(1) | |
# Получаем строки для выбранного вопроса | |
question_rows = questions_df[questions_df['id'] == question_id] | |
if len(question_rows) == 0: | |
print(f"Ошибка: вопрос с ID {question_id} не найден в исходном датасете") | |
sys.exit(1) | |
# Получаем текст вопроса и его индекс в массиве эмбеддингов | |
question_text = question_rows['question'].iloc[0] | |
question_idx = question_id_to_idx[question_id] | |
# Получаем ожидаемые пункты для вопроса | |
expected_puncts = question_rows['text'].tolist() | |
# Вычисляем косинусную близость между вопросом и всеми чанками | |
similarity = cosine_similarity([question_embeddings[question_idx]], chunk_embeddings)[0] | |
# Получаем связанные документы, если есть | |
related_docs = [] | |
if 'filename' in question_rows.columns: | |
related_docs = question_rows['filename'].unique().tolist() | |
related_docs = [doc for doc in related_docs if doc and not pd.isna(doc)] | |
# Результаты для всех документов | |
all_results = [] | |
# Обрабатываем каждый связанный документ | |
if related_docs: | |
for doc_name in related_docs: | |
# Фильтруем чанки по имени документа | |
doc_chunks = chunks_df[chunks_df['doc_name'] == doc_name] | |
if doc_chunks.empty: | |
continue | |
# Индексы чанков для документа | |
doc_chunk_indices = doc_chunks.index.tolist() | |
# Получаем значения близости для чанков документа | |
doc_similarities = [similarity[chunks_df.index.get_loc(idx)] for idx in doc_chunk_indices] | |
# Создаем словарь индекс -> схожесть | |
similarity_dict = {idx: sim for idx, sim in zip(doc_chunk_indices, doc_similarities)} | |
# Сортируем индексы по убыванию похожести | |
sorted_indices = sorted(similarity_dict.keys(), key=lambda x: similarity_dict[x], reverse=True) | |
# Берем топ-N | |
top_indices = sorted_indices[:min(top_n, len(sorted_indices))] | |
# Получаем топ-N чанков | |
top_chunks = chunks_df.iloc[top_indices] | |
# Формируем результаты для документа | |
doc_results = { | |
'doc_name': doc_name, | |
'top_chunks': [] | |
} | |
# Для каждого чанка | |
for idx, chunk in top_chunks.iterrows(): | |
# Вычисляем перекрытие с каждым пунктом | |
overlaps = [] | |
for punct in expected_puncts: | |
overlap = calculate_chunk_overlap(chunk['text'], punct) | |
overlaps.append({ | |
'punct': format_text_for_display(punct), | |
'overlap': overlap | |
}) | |
# Находим максимальное перекрытие | |
max_overlap = max(overlaps, key=lambda x: x['overlap']) if overlaps else {'overlap': 0} | |
# Добавляем в результаты | |
doc_results['top_chunks'].append({ | |
'chunk_id': chunk['id'], | |
'chunk_text': format_text_for_display(chunk['text']), | |
'similarity': similarity_dict[idx], | |
'overlaps': overlaps, | |
'max_overlap': max_overlap['overlap'], | |
'is_relevant': max_overlap['overlap'] >= THRESHOLD # Используем порог 0.7 | |
}) | |
all_results.append(doc_results) | |
else: | |
# Если нет связанных документов, анализируем чанки из всех документов | |
# Получаем индексы для топ-N чанков по близости | |
top_indices = np.argsort(similarity)[-top_n:][::-1] | |
# Получаем топ-N чанков | |
top_chunks = chunks_df.iloc[top_indices] | |
# Группируем чанки по документам | |
doc_groups = top_chunks.groupby('doc_name') | |
for doc_name, group in doc_groups: | |
doc_results = { | |
'doc_name': doc_name, | |
'top_chunks': [] | |
} | |
for idx, chunk in group.iterrows(): | |
# Вычисляем перекрытие с каждым пунктом | |
overlaps = [] | |
for punct in expected_puncts: | |
overlap = calculate_chunk_overlap(chunk['text'], punct) | |
overlaps.append({ | |
'punct': format_text_for_display(punct), | |
'overlap': overlap | |
}) | |
# Находим максимальное перекрытие | |
max_overlap = max(overlaps, key=lambda x: x['overlap']) if overlaps else {'overlap': 0} | |
# Добавляем в результаты | |
doc_results['top_chunks'].append({ | |
'chunk_id': chunk['id'], | |
'chunk_text': format_text_for_display(chunk['text']), | |
'similarity': similarity[chunks_df.index.get_loc(idx)], | |
'overlaps': overlaps, | |
'max_overlap': max_overlap['overlap'], | |
'is_relevant': max_overlap['overlap'] >= THRESHOLD # Используем порог 0.7 | |
}) | |
all_results.append(doc_results) | |
# Формируем общие результаты для вопроса | |
results = { | |
'question_id': question_id, | |
'question_text': question_text, | |
'expected_puncts': [format_text_for_display(punct) for punct in expected_puncts], | |
'related_docs': related_docs, | |
'results_by_doc': all_results | |
} | |
return results | |
def main(): | |
""" | |
Основная функция скрипта. | |
""" | |
args = parse_args() | |
# Загружаем датасет с вопросами | |
questions_df = load_questions_dataset(args.dataset_path) | |
# Формируем уникальное имя для сохраненных файлов на основе параметров стратегии и модели | |
strategy_config_str = f"fixed_size_w{args.words_per_chunk}_o{args.overlap_words}" | |
chunks_filename = f"chunks_{strategy_config_str}_{args.model_name.replace('/', '_')}" | |
questions_filename = f"questions_{args.model_name.replace('/', '_')}" | |
# Загружаем сохраненные эмбеддинги и данные | |
chunk_embeddings, chunks_df = load_embeddings_and_data(chunks_filename, args.output_dir) | |
question_embeddings, questions_df_with_embeddings = load_embeddings_and_data(questions_filename, args.output_dir) | |
# Создаем словарь соответствия id вопроса и его индекса в эмбеддингах | |
question_id_to_idx = { | |
int(row['id']): i | |
for i, (_, row) in enumerate(questions_df_with_embeddings.iterrows()) | |
} | |
# Анализируем выбранный вопрос для указанного top_n | |
results = analyze_question( | |
args.question_id, | |
questions_df, | |
chunks_df, | |
question_embeddings, | |
chunk_embeddings, | |
question_id_to_idx, | |
args.top_n | |
) | |
# Сохраняем результаты в JSON файл | |
output_filename = f"debug_question_{args.question_id}_top{args.top_n}.json" | |
output_path = os.path.join(args.output_dir, output_filename) | |
with open(output_path, 'w', encoding='utf-8') as f: | |
json.dump(results, f, ensure_ascii=False, indent=2) | |
print(f"Результаты сохранены в {output_path}") | |
# Выводим краткую информацию | |
print(f"\nАнализ вопроса ID {args.question_id}: {results['question_text']}") | |
print(f"Ожидаемые пункты: {len(results['expected_puncts'])}") | |
print(f"Связанные документы: {results['related_docs']}") | |
# Статистика релевантности | |
relevant_chunks = 0 | |
total_chunks = 0 | |
for doc_result in results['results_by_doc']: | |
doc_relevant = sum(1 for chunk in doc_result['top_chunks'] if chunk['is_relevant']) | |
doc_total = len(doc_result['top_chunks']) | |
print(f"\nДокумент: {doc_result['doc_name']}") | |
print(f"Релевантных чанков: {doc_relevant} из {doc_total} ({doc_relevant/doc_total*100:.1f}%)") | |
relevant_chunks += doc_relevant | |
total_chunks += doc_total | |
if total_chunks > 0: | |
print(f"\nОбщая точность: {relevant_chunks/total_chunks*100:.1f}%") | |
else: | |
print("\nНе найдено чанков для анализа") | |
if __name__ == "__main__": | |
main() |