Spaces:
Sleeping
Sleeping
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
Основной пайплайн для оценки качества RAG системы. | |
Этот скрипт выполняет один прогон оценки для заданных параметров: | |
- Чтение документов и датасетов вопросов/ответов. | |
- Чанкинг документов. | |
- Векторизация вопросов и чанков. | |
- Оценка релевантности чанков к пунктам из датасета (Chunk-level). | |
- Сборка контекста из релевантных чанков (Assembly-level). | |
- Оценка релевантности собранного контекста к эталонным ответам. | |
- Сохранение детальных метрик для данного прогона. | |
""" | |
import argparse | |
# Add necessary imports for caching | |
import hashlib | |
import json | |
import os | |
import pickle | |
import sys | |
from pathlib import Path | |
from typing import Any | |
from uuid import UUID, uuid4 | |
import numpy as np | |
import pandas as pd | |
import torch | |
from fuzzywuzzy import fuzz | |
from sklearn.metrics.pairwise import cosine_similarity | |
from tqdm import tqdm | |
from transformers import AutoModel, AutoTokenizer | |
# --- Константы (могут быть переопределены аргументами) --- | |
DEFAULT_DATA_FOLDER = "data/input/docs" | |
DEFAULT_SEARCH_DATASET_PATH = "data/input/search_dataset_texts.xlsx" | |
DEFAULT_QA_DATASET_PATH = "data/input/question_answering.xlsx" | |
DEFAULT_MODEL_NAME = "intfloat/e5-base" | |
DEFAULT_BATCH_SIZE = 8 | |
DEFAULT_DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu" | |
DEFAULT_SIMILARITY_THRESHOLD = 0.7 | |
DEFAULT_OUTPUT_DIR = "data/intermediate" # Директория для промежуточных результатов | |
DEFAULT_WORDS_PER_CHUNK = 50 | |
DEFAULT_OVERLAP_WORDS = 25 | |
DEFAULT_TOP_N = 20 # Значение N по умолчанию для топа чанков | |
# Add chunking strategy constant | |
DEFAULT_CHUNKING_STRATEGY = "fixed_size" | |
# Add cache directory constant | |
DEFAULT_CACHE_DIR = "data/cache" | |
# --- Добавление путей к библиотекам --- | |
# Добавляем путь к корневой папке проекта, чтобы можно было импортировать ntr_... | |
SCRIPT_DIR = Path(__file__).parent.resolve() | |
PROJECT_ROOT = SCRIPT_DIR.parent.parent # Перейти на два уровня вверх (scripts/testing -> scripts -> project root) | |
LIB_EXTRACTOR_PATH = PROJECT_ROOT / "lib" / "extractor" | |
sys.path.insert(0, str(LIB_EXTRACTOR_PATH)) | |
# Добавляем путь к папке с ntr_text_fragmentation | |
sys.path.insert(0, str(LIB_EXTRACTOR_PATH / "ntr_text_fragmentation")) | |
# --- Импорты из локальных модулей --- | |
try: | |
from ntr_fileparser import ParsedDocument, UniversalParser | |
from ntr_text_fragmentation import Destructurer | |
from ntr_text_fragmentation.core.entity_repository import \ | |
InMemoryEntityRepository | |
from ntr_text_fragmentation.core.injection_builder import InjectionBuilder | |
from ntr_text_fragmentation.models.chunk import Chunk | |
from ntr_text_fragmentation.models.document import DocumentAsEntity | |
from ntr_text_fragmentation.models.linker_entity import LinkerEntity | |
except ImportError as e: | |
print(f"Ошибка импорта локальных модулей: {e}") | |
print(f"Проверьте пути: Project Root: {PROJECT_ROOT}, Extractor Lib: {LIB_EXTRACTOR_PATH}") | |
sys.exit(1) | |
# --- Вспомогательные функции (аналогичные evaluate_chunking.py) --- | |
def _average_pool( | |
last_hidden_states: torch.Tensor, attention_mask: torch.Tensor | |
) -> torch.Tensor: | |
""" | |
Расчёт усредненного эмбеддинга по всем токенам. | |
(Копипаста из evaluate_chunking.py) | |
""" | |
last_hidden = last_hidden_states.masked_fill( | |
~attention_mask[..., None].bool(), 0.0 | |
) | |
return last_hidden.sum(dim=1) / attention_mask.sum(dim=1)[..., None] | |
def calculate_chunk_overlap(chunk_text: str, punct_text: str) -> float: | |
""" | |
Рассчитывает степень перекрытия между чанком и пунктом. | |
(Копипаста из evaluate_chunking.py) | |
""" | |
if not chunk_text or not punct_text: | |
return 0.0 | |
# Используем partial_ratio для лучшей обработки подстрок | |
return fuzz.partial_ratio(chunk_text, punct_text) / 100.0 | |
# --- Функции загрузки и обработки данных --- | |
def parse_args(): | |
"""Парсит аргументы командной строки.""" | |
parser = argparse.ArgumentParser(description="Пайплайн оценки RAG системы") | |
# Пути к данным | |
parser.add_argument("--data-folder", type=str, default=DEFAULT_DATA_FOLDER, | |
help=f"Папка с документами (по умолчанию: {DEFAULT_DATA_FOLDER})") | |
parser.add_argument("--search-dataset-path", type=str, default=DEFAULT_SEARCH_DATASET_PATH, | |
help=f"Путь к датасету для поиска (по умолчанию: {DEFAULT_SEARCH_DATASET_PATH})") | |
parser.add_argument("--output-dir", type=str, default=DEFAULT_OUTPUT_DIR, | |
help=f"Папка для сохранения промежуточных результатов (по умолчанию: {DEFAULT_OUTPUT_DIR})") | |
parser.add_argument("--run-id", type=str, default=f"run_{uuid4()}", | |
help="Уникальный идентификатор запуска (по умолчанию: генерируется)") | |
# Параметры модели и векторизации | |
parser.add_argument("--model-name", type=str, default=DEFAULT_MODEL_NAME, | |
help=f"Название модели для векторизации (по умолчанию: {DEFAULT_MODEL_NAME})") | |
parser.add_argument("--batch-size", type=int, default=DEFAULT_BATCH_SIZE, | |
help=f"Размер батча для векторизации (по умолчанию: {DEFAULT_BATCH_SIZE})") | |
parser.add_argument("--device", type=str, default=DEFAULT_DEVICE, # type: ignore | |
help=f"Устройство для вычислений (по умолчанию: {DEFAULT_DEVICE})") | |
parser.add_argument("--use-sentence-transformers", action="store_true", | |
help="Использовать библиотеку sentence_transformers") | |
# Параметры чанкинга | |
parser.add_argument("--chunking-strategy", type=str, default=DEFAULT_CHUNKING_STRATEGY, | |
choices=list(Destructurer.STRATEGIES.keys()), # Use keys from Destructurer | |
help=f"Стратегия чанкинга (по умолчанию: {DEFAULT_CHUNKING_STRATEGY})") | |
parser.add_argument("--strategy-params", type=str, default='{}', # Default to empty JSON object | |
help="Параметры для стратегии чанкинга в формате JSON строки (например, '{\"words_per_chunk\": 50}')") | |
parser.add_argument("--no-process-tables", action="store_false", dest="process_tables", | |
help="Отключить обработку таблиц при чанкинге") | |
parser.set_defaults(process_tables=True) # Default is to process tables | |
# Параметры оценки | |
parser.add_argument("--similarity-threshold", type=float, default=DEFAULT_SIMILARITY_THRESHOLD, | |
help=f"Порог для нечеткого сравнения чанка и пункта (по умолчанию: {DEFAULT_SIMILARITY_THRESHOLD})") | |
parser.add_argument("--top-n", type=int, default=DEFAULT_TOP_N, | |
help=f"Количество топ-чанков для рассмотрения (по умолчанию: {DEFAULT_TOP_N})") | |
# Add cache directory argument | |
parser.add_argument("--cache-dir", type=str, default=DEFAULT_CACHE_DIR, | |
help=f"Директория для кэширования эмбеддингов и матриц схожести (по умолчанию: {DEFAULT_CACHE_DIR})") | |
# Параметры сборки контекста | |
parser.add_argument("--use-injection", action="store_true", | |
help="Выполнять ли сборку контекста и её оценку") | |
parser.add_argument("--use-qe", action="store_true", | |
help="Использовать столбец query_expansion вместо question для поиска (если он есть)") | |
parser.add_argument("--include-neighbors", action="store_true", | |
help="Включать ли соседние чанки (предыдущий/следующий) при сборке контекста") | |
# --- Добавляем аргумент для batch_id --- | |
parser.add_argument("--batch-id", type=str, default="batch_default", | |
help="Идентификатор серии запусков (передается из run_pipelines.py)") | |
# TODO: Добавить другие параметры при необходимости (например, параметры InjectionBuilder) | |
return parser.parse_args() | |
def read_documents(folder_path: str) -> dict[str, ParsedDocument]: | |
""" | |
Читает все документы из указанной папки и создает сущности. | |
Args: | |
folder_path: Путь к папке с документами | |
Returns: | |
Словарь {имя_файла: объект ParsedDocument} | |
""" | |
print(f"Чтение документов из {folder_path}...") | |
parser = UniversalParser() | |
documents_map = {} | |
doc_files = list(Path(folder_path).glob("*.docx")) | |
if not doc_files: | |
print(f"ВНИМАНИЕ: В папке {folder_path} не найдено *.docx файлов.") | |
return {} | |
for file_path in tqdm(doc_files, desc="Чтение документов"): | |
try: | |
doc_name = file_path.stem | |
# Парсим документ с помощью UniversalParser | |
parsed_document = parser.parse_by_path(str(file_path)) | |
# Сохраняем распарсенный документ | |
documents_map[doc_name] = parsed_document | |
except Exception as e: | |
print(f"Ошибка при чтении файла {file_path}: {e}") | |
print(f"Прочитано документов: {len(documents_map)}") | |
return documents_map | |
def load_datasets(search_dataset_path: str) -> tuple[pd.DataFrame, pd.DataFrame]: | |
""" | |
Загружает датасет для поиска и готовит данные для векторизации. | |
Args: | |
search_dataset_path: Путь к Excel с пунктами для поиска. | |
Returns: | |
Кортеж: (полный DataFrame поискового датасета, DataFrame с уникальными вопросами для векторизации). | |
""" | |
print(f"Загрузка поискового датасета из {search_dataset_path}...") | |
try: | |
search_df = pd.read_excel(search_dataset_path) | |
print(f"Загружен поисковый датасет: {len(search_df)} строк, столбцы: {search_df.columns.tolist()}") | |
# Проверяем наличие обязательных столбцов | |
required_columns = ['id', 'question', 'text', 'filename'] | |
missing_cols = [col for col in required_columns if col not in search_df.columns] | |
if missing_cols: | |
print(f"Ошибка: В поисковом датасете отсутствуют обязательные столбцы: {missing_cols}") | |
sys.exit(1) | |
# Преобразуем NaN в пустые строки для текстовых полей | |
# Добавляем 'query_expansion', если он есть, для обработки NaN | |
text_columns = ['question', 'text', 'item_type', 'filename'] | |
if 'query_expansion' in search_df.columns: | |
text_columns.append('query_expansion') | |
for col in text_columns: | |
if col in search_df.columns: | |
search_df[col] = search_df[col].fillna('') | |
# Если необязательный item_type отсутствует, добавляем его пустым | |
elif col == 'item_type': | |
print(f"Предупреждение: столбец '{col}' отсутствует в поисковом датасете. Добавлен пустой столбец.") | |
search_df[col] = '' | |
# Убедимся, что 'id' имеет целочисленный тип | |
try: | |
search_df['id'] = search_df['id'].astype(int) | |
except ValueError as e: | |
print(f"Ошибка при приведении типов столбца 'id' в поисковом датасете: {e}. Убедитесь, что ID являются целыми числами.") | |
sys.exit(1) | |
except FileNotFoundError: | |
print(f"Ошибка: Поисковый датасет не найден по пути {search_dataset_path}") | |
sys.exit(1) | |
except Exception as e: | |
print(f"Ошибка при чтении поискового датасета: {e}") | |
sys.exit(1) | |
# Готовим DataFrame для векторизации уникальных вопросов | |
# Включаем query_expansion, если он есть | |
cols_for_embedding = ['id', 'question'] | |
query_expansion_exists = 'query_expansion' in search_df.columns | |
if query_expansion_exists: | |
cols_for_embedding.append('query_expansion') | |
print("Столбец 'query_expansion' найден в поисковом датасете.") | |
else: | |
print("Столбец 'query_expansion' не найден в поисковом датасете.") | |
questions_to_embed = search_df[cols_for_embedding].drop_duplicates(subset=['id']).copy() | |
# Если query_expansion не существует, добавляем пустой столбец для единообразия | |
if not query_expansion_exists: | |
questions_to_embed['query_expansion'] = '' | |
print(f"Уникальных вопросов для векторизации: {len(questions_to_embed)}") | |
# Теперь search_df это и есть наш "объединенный" датасет (так как QA не используется) | |
return search_df, questions_to_embed | |
def perform_chunking( | |
documents_map: dict[str, ParsedDocument], | |
chunking_strategy: str, | |
process_tables: bool, | |
strategy_params_json: str # Expect JSON string | |
) -> tuple[pd.DataFrame, list[LinkerEntity]]: | |
""" | |
Выполняет чанкинг для всех документов. | |
Args: | |
documents_map: Словарь {имя_файла: сущность_документа}. | |
chunking_strategy: Имя используемой стратегии чанкинга. | |
process_tables: Флаг, указывающий, нужно ли обрабатывать таблицы. | |
strategy_params_json: Строка JSON с параметрами для стратегии. | |
Returns: | |
Кортеж: (DataFrame с чанками для поиска, список всех созданных сущностей LinkerEntity) | |
""" | |
print("Выполнение чанкинга...") | |
searchable_chunks_data = [] # Данные только для чанков с in_search_text | |
final_entities: list[LinkerEntity] = [] # Список для ВСЕХ сущностей (доки, чанки, связи и т.д.) | |
# Parse strategy parameters from JSON string | |
try: | |
chunking_params = json.loads(strategy_params_json) | |
print(f"Параметры для стратегии '{chunking_strategy}': {chunking_params}") | |
except json.JSONDecodeError as e: | |
print(f"Ошибка парсинга JSON для strategy-params: '{strategy_params_json}'. Используются параметры по умолчанию стратегии. Ошибка: {e}") | |
chunking_params = {} # Use strategy defaults if JSON is invalid | |
print(f"Используется стратегия чанкинга: '{chunking_strategy}'") | |
print(f"Обработка таблиц: {'Включена' if process_tables else 'Отключена'}") | |
for doc_name, parsed_doc in tqdm(documents_map.items(), desc="Чанкинг документов"): | |
try: | |
# Инициализируем Destructurer ВНУТРИ цикла для КАЖДОГО документа | |
destructurer = Destructurer( | |
document=parsed_doc, | |
process_tables=process_tables, | |
strategy_name=chunking_strategy, # Передаем имя стратегии при инициализации | |
**chunking_params # И параметры стратегии | |
) | |
# Destructure создает DocumentAsEntity, чанки, связи и возвращает их как LinkerEntity | |
new_entities = destructurer.destructure() | |
# Добавляем ВСЕ созданные сущности (сериализованные LinkerEntity) в общий список | |
final_entities.extend(new_entities) | |
# Собираем данные для DataFrame только из тех сущностей, | |
# у которых есть поле in_search_text (это наши чанки для поиска) | |
for entity in new_entities: | |
# Проверяем наличие атрибута 'in_search_text', а не тип | |
if hasattr(entity, 'in_search_text') and entity.in_search_text: | |
entity_data = { | |
'chunk_id': str(entity.id), | |
'doc_name': doc_name, # Имя исходного файла | |
'doc_id': str(entity.source_id), # ID сущности документа (DocumentAsEntity) | |
'text': entity.in_search_text, # Текст для векторизации и поиска | |
'type': entity.type, # Тип сущности (например, 'FixedSizeChunk') | |
'strategy_params': json.dumps(chunking_params, ensure_ascii=False), | |
} | |
searchable_chunks_data.append(entity_data) | |
except Exception as e: | |
# Логируем ошибку и продолжаем с остальными документами | |
print(f"\nОшибка при чанкинге документа {doc_name}: {e}") | |
import traceback | |
traceback.print_exc() # Печатаем traceback для детальной отладки | |
# Создаем DataFrame только из чанков, предназначенных для поиска | |
chunks_df = pd.DataFrame(searchable_chunks_data) | |
print(f"Создано чанков для поиска: {len(chunks_df)}") | |
# Возвращаем DataFrame с чанками для поиска и ПОЛНЫЙ список всех LinkerEntity | |
return chunks_df, final_entities | |
def setup_model_and_tokenizer(model_name: str, use_sentence_transformers: bool, device: str): | |
"""Инициализирует модель и токенизатор.""" | |
print(f"Загрузка модели {model_name} на устройство {device}...") | |
if use_sentence_transformers: | |
try: | |
from sentence_transformers import SentenceTransformer | |
model = SentenceTransformer(model_name, device=device) | |
tokenizer = None # sentence_transformers не требует отдельного токенизатора | |
print("Используется SentenceTransformer.") | |
return model, tokenizer | |
except ImportError: | |
print("Ошибка: Библиотека sentence_transformers не установлена. Установите: pip install sentence-transformers") | |
sys.exit(1) | |
else: | |
try: | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
model = AutoModel.from_pretrained(model_name).to(device) | |
model.eval() | |
print("Используется AutoModel и AutoTokenizer из transformers.") | |
return model, tokenizer | |
except Exception as e: | |
print(f"Ошибка при загрузке модели {model_name} из transformers: {e}") | |
sys.exit(1) | |
def get_embeddings( | |
texts: list[str], | |
model, | |
tokenizer, | |
batch_size: int, | |
use_sentence_transformers: bool, | |
device: str | |
) -> np.ndarray: | |
"""Получает эмбеддинги для списка текстов.""" | |
all_embeddings = [] | |
desc = "Векторизация (Sentence Transformers)" if use_sentence_transformers else "Векторизация (Transformers)" | |
for i in tqdm(range(0, len(texts), batch_size), desc=desc): | |
batch_texts = texts[i:i+batch_size] | |
if not batch_texts: | |
continue | |
if use_sentence_transformers: | |
# Эмбеддинги через sentence_transformers | |
embeddings = model.encode(batch_texts, batch_size=len(batch_texts), show_progress_bar=False) | |
all_embeddings.append(embeddings) | |
else: | |
# Эмбеддинги через transformers с average pooling | |
try: | |
encoding = tokenizer( | |
batch_texts, | |
padding=True, | |
truncation=True, | |
max_length=512, # Стандартное ограничение для многих моделей | |
return_tensors="pt" | |
).to(device) | |
with torch.no_grad(): | |
outputs = model(**encoding) | |
embeddings = _average_pool(outputs.last_hidden_state, encoding["attention_mask"]) | |
all_embeddings.append(embeddings.cpu().numpy()) | |
except Exception as e: | |
print(f"Ошибка при векторизации батча (индексы {i} - {i+batch_size}): {e}") | |
print(f"Тексты батча: {batch_texts[:2]}...") | |
# Добавляем нулевые векторы, чтобы не сломать vstack | |
# Определяем размер эмбеддинга | |
if all_embeddings: | |
embedding_dim = all_embeddings[0].shape[1] | |
else: | |
# Пытаемся получить размер из конфигурации модели | |
try: | |
embedding_dim = model.config.hidden_size | |
except AttributeError: | |
embedding_dim = 768 # Запасной вариант | |
print(f"Не удалось определить размер эмбеддинга, используется {embedding_dim}") | |
print(f"Добавление нулевых эмбеддингов размерности ({len(batch_texts)}, {embedding_dim})") | |
null_embeddings = np.zeros((len(batch_texts), embedding_dim), dtype=np.float32) | |
all_embeddings.append(null_embeddings) | |
if not all_embeddings: | |
print("ВНИМАНИЕ: Не удалось создать эмбеддинги.") | |
# Возвращаем пустой массив правильной формы, если возможно | |
try: | |
embedding_dim = model.config.hidden_size if not use_sentence_transformers else model.get_sentence_embedding_dimension() | |
except: | |
embedding_dim = 768 | |
return np.empty((0, embedding_dim), dtype=np.float32) | |
# Объединяем эмбеддинги из всех батчей | |
try: | |
final_embeddings = np.vstack(all_embeddings) | |
except ValueError as e: | |
print(f"Ошибка при объединении эмбеддингов: {e}") | |
print("Размеры эмбеддингов в батчах:") | |
for i, emb_batch in enumerate(all_embeddings): | |
print(f" Батч {i}: {emb_batch.shape}") | |
# Попробуем определить ожидаемый размер и создать нулевой массив | |
if all_embeddings: | |
embedding_dim = all_embeddings[0].shape[1] | |
print(f"Возвращение нулевого массива размерности ({len(texts)}, {embedding_dim})") | |
return np.zeros((len(texts), embedding_dim), dtype=np.float32) | |
else: | |
return np.empty((0, 768), dtype=np.float32) # Запасной вариант | |
print(f"Получено эмбеддингов: {final_embeddings.shape}") | |
return final_embeddings | |
# --- Caching Helper Functions --- | |
def _get_params_hash( | |
model_name: str, | |
process_tables: bool | None = None, | |
strategy_params: dict | None = None # Expect the parsed dictionary | |
) -> str: | |
"""Создает MD5 хэш из переданных параметров.""" | |
hasher = hashlib.md5() | |
hasher.update(model_name.encode()) | |
# Add chunking strategy and table processing flag if provided | |
if process_tables is not None: | |
hasher.update(str(process_tables).encode()) | |
# Add strategy parameters (sort items to ensure consistent hash) | |
if strategy_params: | |
sorted_params = sorted(strategy_params.items()) | |
hasher.update(json.dumps(sorted_params).encode()) | |
return hasher.hexdigest() | |
def _get_cache_path(cache_dir: Path, hash_str: str, filename: str) -> Path: | |
"""Формирует путь к файлу кэша, создавая поддиректории.""" | |
# Используем первые 2 символа хэша для распределения по поддиректориям | |
# Это помогает избежать слишком большого количества файлов в одной директории | |
cache_subdir = cache_dir / hash_str[:2] / hash_str | |
cache_subdir.mkdir(parents=True, exist_ok=True) | |
return cache_subdir / filename | |
# --- Добавляем функцию для хэша чанкинга --- | |
def _get_chunking_cache_hash( | |
data_folder: str, | |
chunking_strategy: str, | |
process_tables: bool, | |
strategy_params: dict # Ожидаем словарь | |
) -> str: | |
"""Создает MD5 хэш для параметров чанкинга и папки с данными.""" | |
hasher = hashlib.md5() | |
hasher.update(data_folder.encode()) | |
hasher.update(chunking_strategy.encode()) | |
hasher.update(str(process_tables).encode()) | |
# Сортируем параметры для консистентности хэша | |
sorted_params = sorted(strategy_params.items()) | |
hasher.update(json.dumps(sorted_params).encode()) | |
return hasher.hexdigest() | |
# --------------------------------------------- | |
# --- Main Evaluation Function --- | |
def evaluate_run( | |
search_dataset: pd.DataFrame, | |
questions_to_embed: pd.DataFrame, | |
chunks_df: pd.DataFrame, | |
all_entities: list[LinkerEntity], | |
model: Any | None, # Принимаем None | |
tokenizer: Any | None, # Принимаем None | |
args: argparse.Namespace | |
) -> pd.DataFrame: | |
""" | |
Выполняет основной цикл оценки для одного набора параметров. | |
Args: | |
search_dataset: DataFrame поискового датасета. | |
questions_to_embed: DataFrame с уникальными вопросами для векторизации. | |
chunks_df: DataFrame с данными по чанкам. | |
all_entities: Список всех сущностей (документы, чанки, связи). | |
model: Модель для векторизации. | |
tokenizer: Токенизатор. | |
args: Аргументы командной строки. | |
Returns: | |
DataFrame с детальными метриками по каждому вопросу для этого запуска. | |
""" | |
print("Начало этапа оценки...") | |
# Переменные для модели и токенизатора, инициализируем None | |
loaded_model: Any | None = model | |
loaded_tokenizer: Any | None = tokenizer | |
# --- Caching Setup --- | |
print("Настройка кэширования...") | |
CACHE_DIR_PATH = Path(args.cache_dir) | |
model_slug = args.model_name.split('/')[-1] # Basic slug for filename clarity | |
# --- Определяем, какой текст использовать для эмбеддингов вопросов --- | |
# и устанавливаем флаг qe_active, который будет влиять на кэш | |
if args.use_qe and 'query_expansion' in questions_to_embed.columns and questions_to_embed['query_expansion'].notna().any(): # Check if column exists and has non-NA values | |
print("Используется Query Expansion (столбец 'query_expansion') для векторизации вопросов.") | |
query_texts_to_embed = questions_to_embed['query_expansion'].tolist() | |
qe_active = True | |
else: | |
print("Используется оригинальный текст вопроса (столбец 'question') для векторизации.") | |
query_texts_to_embed = questions_to_embed['question'].tolist() | |
qe_active = False | |
# Cache key for question embeddings (ЗАВИСИТ от модели и флага use_qe) | |
question_params_for_hash = { | |
'model_name': args.model_name, | |
'use_qe': qe_active # Добавляем фактическое использование QE в параметры для хэша | |
} | |
question_hash = hashlib.md5(json.dumps(question_params_for_hash, sort_keys=True).encode()).hexdigest() | |
question_embeddings_cache_path = _get_cache_path( | |
CACHE_DIR_PATH, question_hash, f"q_embeddings_{model_slug}_qe{qe_active}.npy" | |
) | |
# Cache key for chunk embeddings (depends on model and chunking) | |
chunk_hash = _get_params_hash( | |
args.model_name, | |
args.process_tables, # Include table flag | |
json.loads(args.strategy_params) # Pass parsed params dictionary | |
) | |
chunk_embeddings_cache_path = _get_cache_path( | |
CACHE_DIR_PATH, chunk_hash, | |
f"c_emb_{model_slug}_s-{args.chunking_strategy}_t{args.process_tables}_ph-{hashlib.md5(args.strategy_params.encode()).hexdigest()[:8]}.npy" | |
) | |
# Cache key for similarity matrix (depends on both sets of embeddings) | |
similarity_hash = f"{question_hash}_{chunk_hash}" # Combine hashes | |
similarity_cache_path = _get_cache_path( | |
CACHE_DIR_PATH, similarity_hash, | |
f"sim_{model_slug}_qe{qe_active}_ph-{hashlib.md5(args.strategy_params.encode()).hexdigest()[:8]}.npy" # Добавляем флаг QE в имя файла | |
) | |
# 1. Векторизация вопросов и чанков (с кэшем) | |
question_embeddings = None | |
needs_model_load = False # Флаг, указывающий, нужна ли загрузка модели | |
if question_embeddings_cache_path.exists(): | |
try: | |
print(f"Загрузка кэшированных эмбеддингов вопросов из: {question_embeddings_cache_path}") | |
question_embeddings = np.load(question_embeddings_cache_path, allow_pickle=False) | |
if len(question_embeddings) != len(questions_to_embed): | |
print(f"Предупреждение: Размер кэша эмбеддингов вопросов не совпадает. Пересчет.") | |
question_embeddings = None | |
else: | |
print("Кэш эмбеддингов вопросов успешно загружен.") | |
except Exception as e: | |
print(f"Ошибка загрузки кэша эмбеддингов вопросов: {e}. Пересчет.") | |
question_embeddings = None | |
if question_embeddings is None: | |
needs_model_load = True # Требуется модель для генерации эмбеддингов | |
print("Векторизация вопросов (потребуется загрузка модели)...") | |
chunk_embeddings = None | |
if chunk_embeddings_cache_path.exists(): | |
try: | |
print(f"Загрузка кэшированных эмбеддингов чанков из: {chunk_embeddings_cache_path}") | |
chunk_embeddings = np.load(chunk_embeddings_cache_path, allow_pickle=False) | |
if len(chunk_embeddings) != len(chunks_df): | |
print(f"Предупреждение: Размер кэша эмбеддингов чанков не совпадает. Пересчет.") | |
chunk_embeddings = None | |
else: | |
print("Кэш эмбеддингов чанков успешно загружен.") | |
except Exception as e: | |
print(f"Ошибка загрузки кэша эмбеддингов чанков: {e}. Пересчет.") | |
chunk_embeddings = None | |
if chunk_embeddings is None: | |
needs_model_load = True # Требуется модель для генерации эмбеддингов | |
print("Векторизация чанков (потребуется загрузка модели)...") | |
# --- Отложенная загрузка модели, если необходимо --- | |
if needs_model_load and loaded_model is None: | |
print("\n--- Загрузка модели и токенизатора (т.к. кэш эмбеддингов отсутствует) ---") | |
loaded_model, loaded_tokenizer = setup_model_and_tokenizer( | |
args.model_name, args.use_sentence_transformers, args.device | |
) | |
print("--- Модель и токенизатор загружены ---\n") | |
# --- Повторная генерация эмбеддингов, если они не загрузились из кэша --- | |
if question_embeddings is None: | |
if loaded_model is None: | |
print("Критическая ошибка: Модель не загружена, но требуется для векторизации вопросов!") | |
# Возвращаем пустой DataFrame или выбрасываем исключение | |
return pd.DataFrame() | |
print("Повторная векторизация вопросов...") | |
question_embeddings = get_embeddings( | |
query_texts_to_embed, | |
loaded_model, loaded_tokenizer, args.batch_size, args.use_sentence_transformers, args.device | |
) | |
if question_embeddings.shape[0] > 0: | |
try: | |
print(f"Сохранение эмбеддингов вопросов в кэш: {question_embeddings_cache_path}") | |
np.save(question_embeddings_cache_path, question_embeddings, allow_pickle=False) | |
except Exception as e: | |
print(f"Не удалось сохранить кэш эмбеддингов вопросов: {e}") | |
if chunk_embeddings is None: | |
if loaded_model is None: | |
print("Критическая ошибка: Модель не загружена, но требуется для векторизации чанков!") | |
return pd.DataFrame() | |
print("Повторная векторизация чанков...") | |
chunk_texts = chunks_df['text'].fillna('').astype(str).tolist() | |
chunk_embeddings = get_embeddings( | |
chunk_texts, | |
loaded_model, loaded_tokenizer, args.batch_size, args.use_sentence_transformers, args.device | |
) | |
if chunk_embeddings.shape[0] > 0: | |
try: | |
print(f"Сохранение эмбеддингов чанков в кэш: {chunk_embeddings_cache_path}") | |
np.save(chunk_embeddings_cache_path, chunk_embeddings, allow_pickle=False) | |
except Exception as e: | |
print(f"Не удалось сохранить кэш эмбеддингов чанков: {e}") | |
# Проверка совпадения количества эмбеддингов и данных | |
if len(question_embeddings) != len(questions_to_embed): | |
print(f"Ошибка: Количество эмбеддингов вопросов ({len(question_embeddings)}) не совпадает с количеством уникальных вопросов ({len(questions_to_embed)}).") | |
# Можно либо прервать выполнение, либо попытаться исправить | |
# Например, взять первые N эмбеддингов, но это может быть некорректно | |
sys.exit(1) | |
if len(chunk_embeddings) != len(chunks_df): | |
print(f"Ошибка: Количество эмбеддингов чанков ({len(chunk_embeddings)}) не совпадает с количеством чанков в DataFrame ({len(chunks_df)}).") | |
# Попытка исправить (если ошибка небольшая) или выход | |
if abs(len(chunk_embeddings) - len(chunks_df)) < 5: | |
print("Попытка обрезать лишние эмбеддинги/данные...") | |
min_len = min(len(chunk_embeddings), len(chunks_df)) | |
chunk_embeddings = chunk_embeddings[:min_len] | |
chunks_df = chunks_df.iloc[:min_len] | |
print(f"Размеры выровнены до {min_len}") | |
else: | |
sys.exit(1) | |
# Создаем маппинг ID вопроса к индексу в эмбеддингах | |
question_id_to_idx = { | |
row['id']: i for i, (_, row) in enumerate(questions_to_embed.iterrows()) | |
} | |
# 2. Расчет косинусной близости | |
print("Расчет косинусной близости...") | |
# Проверка на пустые эмбеддинги | |
if question_embeddings.shape[0] == 0 or chunk_embeddings.shape[0] == 0: | |
print("Ошибка: Отсутствуют эмбеддинги вопросов или чанков для расчета близости.") | |
# Возвращаем пустой DataFrame или обрабатываем ошибку иначе | |
return pd.DataFrame() | |
similarity_matrix = cosine_similarity(question_embeddings, chunk_embeddings) | |
# 3. Инициализация InjectionBuilder (если нужно) | |
injection_builder = None | |
if args.use_injection: | |
print("Инициализация InjectionBuilder...") | |
repository = InMemoryEntityRepository(all_entities) | |
injection_builder = InjectionBuilder(repository) | |
# TODO: Зарегистрировать стратегии, если необходимо | |
# builder.register_strategy(...) | |
# 4. Цикл по уникальным вопросам для оценки | |
results = [] | |
print(f"Оценка для {len(questions_to_embed)} уникальных вопросов...") | |
for question_id_iter, question_data in tqdm(questions_to_embed.iterrows(), total=len(questions_to_embed), desc="Оценка вопросов"): # Renamed loop variable | |
q_id = question_data['id'] | |
q_text = question_data['question'] | |
# Получаем все строки из исходного датасета для этого вопроса | |
question_rows = search_dataset[search_dataset['id'] == q_id] # Use search_dataset | |
if question_rows.empty: | |
print(f"Предупреждение: Нет данных в search_dataset для вопроса ID={q_id}") | |
continue | |
# Получаем пункты (relevant items) | |
puncts = question_rows['text'].tolist() | |
# reference_answer больше не используется и не извлекается | |
# Получаем индекс вопроса в матрице близости | |
if q_id not in question_id_to_idx: | |
print(f"Предупреждение: Вопрос ID={q_id} не найден в маппинге эмбеддингов.") | |
continue | |
question_idx = question_id_to_idx[q_id] | |
# --- Оценка на уровне чанков (Chunk-level) --- | |
chunk_level_metrics = evaluate_chunk_relevance( | |
q_id, question_idx, puncts, | |
similarity_matrix, chunks_df, args.top_n, args.similarity_threshold | |
) | |
# --- Оценка на уровне сборки (Assembly-level) --- | |
# Удаляем assembly_relevance, основанный на reference_answer | |
assembly_level_metrics = {} # Start with an empty dict for assembly metrics | |
assembled_context = "" | |
top_chunk_indices = chunk_level_metrics.get("top_chunk_ids", []) # Get indices first | |
neighbors_included = False # Flag to log | |
if args.use_injection and injection_builder and top_chunk_indices: | |
try: | |
# Преобразуем ID строк обратно в UUID чанков | |
top_chunk_uuids = [UUID(chunks_df.iloc[idx]['chunk_id']) for idx in top_chunk_indices] | |
final_chunk_uuids_for_assembly = set(top_chunk_uuids) # Start with top chunks | |
# --- Добавляем соседей, если нужно --- | |
if args.include_neighbors: | |
neighbors_included = True | |
# --- Убираем логирование индексов --- | |
neighbor_chunks = repository.get_neighboring_chunks(chunk_ids=top_chunk_uuids, max_distance=1) | |
neighbor_ids = {neighbor.id for neighbor in neighbor_chunks} | |
# --- Логирование до/после добавления ID соседей --- | |
print(f" [DEBUG QID {q_id}] Кол-во ID до добавления соседей: {len(final_chunk_uuids_for_assembly)}") | |
print(f" [DEBUG QID {q_id}] Кол-во найденных ID соседей: {len(neighbor_ids)}") | |
final_chunk_uuids_for_assembly.update(neighbor_ids) | |
print(f" [DEBUG QID {q_id}] Кол-во ID после добавления соседей: {len(final_chunk_uuids_for_assembly)}") | |
# --- Конец логирования --- | |
# --- Убираем логирование индексов --- | |
else: | |
# --- Убираем логирование индексов --- | |
pass # Ничего не делаем, если соседи не включены | |
# Собираем контекст | |
# Передаем финальный набор UUID (уникальный) | |
assembled_context = injection_builder.build( | |
filtered_entities=list(final_chunk_uuids_for_assembly), | |
# chunk_scores= {chunks_df.loc[idx, 'chunk_id']: sim for idx, sim in zip(top_chunk_ids_for_assembly, chunk_level_metrics.get('top_chunk_similarities',[]))} # Можно добавить веса | |
) | |
# --- Новая метрика: Assembly Punct Recall --- | |
# Оцениваем, сколько пунктов из датасета найдено в собранном контексте | |
# (по вашей идее: пункт считается найденным, если хотя бы одна его часть, | |
# разделенная переносом строки, найдена в контексте) | |
assembly_found_puncts = 0 | |
assembly_total_puncts = len(puncts) | |
if assembly_total_puncts > 0 and assembled_context: | |
# Итерируемся по каждому исходному пункту | |
for punct_text in puncts: | |
# Разбиваем пункт на части по переносу строки | |
# Убираем пустые строки, которые могут появиться из-за двойных переносов | |
punct_parts = [part for part in punct_text.split('\n') if part.strip()] | |
# Если пункт пустой или состоит только из пробельных символов после разбивки, | |
# пропускаем его (не считаем ни найденным, ни не найденным в контексте recall) | |
if not punct_parts: | |
assembly_total_puncts -= 1 # Уменьшаем общее число пунктов для расчета recall | |
continue | |
is_punct_found = False | |
# Итерируемся по частям пункта | |
for part_text in punct_parts: | |
# Сравниваем КАЖДУЮ ЧАСТЬ пункта с собранным контекстом | |
if calculate_chunk_overlap(assembled_context, part_text.strip()) >= args.similarity_threshold: | |
# Если ХОТЯ БЫ ОДНА часть найдена, считаем ВЕСЬ пункт найденным | |
is_punct_found = True | |
break # Дальше части этого пункта можно не проверять | |
# Если флаг is_punct_found стал True, увеличиваем счетчик найденных пунктов | |
if is_punct_found: | |
assembly_found_puncts += 1 | |
# Рассчитываем recall, только если были валидные пункты для проверки | |
if assembly_total_puncts > 0: | |
assembly_level_metrics["assembly_punct_recall"] = assembly_found_puncts / assembly_total_puncts | |
else: | |
assembly_level_metrics["assembly_punct_recall"] = 0.0 # Или можно None, если нет валидных пунктов | |
else: | |
assembly_level_metrics["assembly_punct_recall"] = 0.0 | |
# Добавляем сам текст сборки для возможного анализа (усеченный) | |
assembly_level_metrics["assembled_context_preview"] = assembled_context[:500] + ("..." if len(assembled_context) > 500 else "") | |
except Exception as e: | |
print(f"Ошибка при сборке/оценке контекста для вопроса ID={q_id}: {e}") | |
# Записываем None или 0, чтобы не прерывать процесс | |
assembly_level_metrics["assembly_punct_recall"] = None # Indicate error | |
assembly_level_metrics["assembled_context_preview"] = f"Error during assembly: {e}" | |
# Собираем все метрики для вопроса | |
question_result = { | |
"run_id": args.run_id, | |
"batch_id": args.batch_id, # --- Добавляем batch_id в результаты --- | |
"question_id": q_id, | |
"question_text": q_text, | |
# Параметры запуска | |
"model_name": args.model_name, | |
"chunking_strategy": args.chunking_strategy, # Log strategy | |
"process_tables": args.process_tables, # Log table flag | |
"strategy_params": args.strategy_params, # Log JSON string | |
"top_n": args.top_n, | |
"use_injection": args.use_injection, | |
"use_qe": qe_active, # Log QE status | |
"neighbors_included": neighbors_included, # Log neighbor flag | |
"similarity_threshold": args.similarity_threshold, | |
# Метрики Chunk-level | |
**chunk_level_metrics, | |
# Метрики Assembly-level (теперь с recall по пунктам) | |
**assembly_level_metrics, | |
# Тексты для отладки (эталонный ответ удален, сборка добавлена выше) | |
# "assembled_context": assembled_context[:500] + "..." if assembled_context else "", | |
} | |
results.append(question_result) | |
print("Оценка завершена.") | |
return pd.DataFrame(results) | |
def evaluate_chunk_relevance( | |
question_id: int, | |
question_idx: int, | |
puncts: list[str], | |
similarity_matrix: np.ndarray, | |
chunks_df: pd.DataFrame, | |
top_n: int, | |
similarity_threshold: float | |
) -> dict: | |
""" | |
Оценивает релевантность чанков для одного вопроса. | |
(Адаптировано из evaluate_for_top_n_with_mapping в evaluate_chunking.py) | |
Возвращает словарь с метриками для этого вопроса. | |
""" | |
metrics = { | |
"chunk_text_precision": 0.0, | |
"chunk_text_recall": 0.0, | |
"chunk_text_f1": 0.0, | |
"found_puncts": 0, | |
"total_puncts": len(puncts), | |
"relevant_chunks": 0, | |
"total_chunks_in_top_n": 0, | |
"top_chunk_ids": [], # Индексы строк в chunks_df | |
"top_chunk_similarities": [], | |
} | |
if chunks_df.empty or similarity_matrix.shape[1] == 0: | |
print(f"Предупреждение (QID {question_id}): Нет чанков для оценки.") | |
return metrics | |
# Получаем схожести всех чанков с текущим вопросом | |
question_similarities = similarity_matrix[question_idx, :] | |
# Сортируем чанки по схожести и берем top_n | |
# argsort возвращает индексы элементов, которые бы отсортировали массив | |
# Берем последние N индексов (-top_n:) и разворачиваем ([::-1]) для убывания | |
# Добавляем проверку на случай если top_n > количества чанков | |
if top_n >= similarity_matrix.shape[1]: | |
sorted_chunk_indices = np.argsort(question_similarities)[::-1] # Берем все, сортируем по убыванию | |
else: | |
sorted_chunk_indices = np.argsort(question_similarities)[-top_n:][::-1] | |
# Ограничиваем top_n, если чанков меньше (это должно быть сделано выше, но дублируем для надежности) | |
actual_top_n = min(top_n, len(sorted_chunk_indices)) | |
top_chunk_indices = sorted_chunk_indices[:actual_top_n] | |
# Сохраняем ID и схожести топ-чанков | |
metrics["top_chunk_ids"] = top_chunk_indices.tolist() | |
metrics["top_chunk_similarities"] = question_similarities[top_chunk_indices].tolist() | |
# Отбираем данные топ-чанков | |
top_chunks_df = chunks_df.iloc[top_chunk_indices] | |
metrics["total_chunks_in_top_n"] = len(top_chunks_df) | |
if metrics["total_chunks_in_top_n"] == 0: | |
return metrics # Если нет топ-чанков, метрики остаются нулевыми | |
# Оценка на основе текста (пунктов) | |
punct_found = [False] * metrics["total_puncts"] | |
question_relevant_chunks = 0 | |
for i, (idx, chunk_row) in enumerate(top_chunks_df.iterrows()): | |
chunk_text = chunk_row['text'] | |
is_relevant_to_punct = False | |
for j, punct_text in enumerate(puncts): | |
overlap = calculate_chunk_overlap(chunk_text, punct_text) | |
if overlap >= similarity_threshold: | |
is_relevant_to_punct = True | |
punct_found[j] = True | |
if is_relevant_to_punct: | |
question_relevant_chunks += 1 | |
metrics["found_puncts"] = sum(punct_found) | |
metrics["relevant_chunks"] = question_relevant_chunks | |
if metrics["total_chunks_in_top_n"] > 0: | |
metrics["chunk_text_precision"] = metrics["relevant_chunks"] / metrics["total_chunks_in_top_n"] | |
if metrics["total_puncts"] > 0: | |
metrics["chunk_text_recall"] = metrics["found_puncts"] / metrics["total_puncts"] | |
if metrics["chunk_text_precision"] + metrics["chunk_text_recall"] > 0: | |
metrics["chunk_text_f1"] = (2 * metrics["chunk_text_precision"] * metrics["chunk_text_recall"] / | |
(metrics["chunk_text_precision"] + metrics["chunk_text_recall"])) | |
return metrics | |
# --- Основная функция --- | |
def main(): | |
"""Основная функция скрипта.""" | |
args = parse_args() | |
print(f"Запуск оценки с ID: {args.run_id}") | |
print(f"Параметры: {vars(args)}") | |
# --- Кэширование Чанкинга --- | |
CACHE_DIR_PATH = Path(args.cache_dir) | |
try: | |
# Парсим параметры стратегии один раз | |
parsed_strategy_params = json.loads(args.strategy_params) | |
except json.JSONDecodeError: | |
print(f"Предупреждение: Невалидный JSON в strategy_params: '{args.strategy_params}'. Используются параметры по умолчанию для хэша кэша.") | |
parsed_strategy_params = {} | |
chunking_hash = _get_chunking_cache_hash( | |
args.data_folder, | |
args.chunking_strategy, | |
args.process_tables, | |
parsed_strategy_params | |
) | |
chunks_df_cache_path = _get_cache_path(CACHE_DIR_PATH, chunking_hash, "chunks_df.parquet") | |
entities_cache_path = _get_cache_path(CACHE_DIR_PATH, chunking_hash, "final_entities.pkl") | |
chunks_df = None | |
all_entities = None | |
if chunks_df_cache_path.exists() and entities_cache_path.exists(): | |
print(f"Найден кэш чанкинга (hash: {chunking_hash}). Загрузка...") | |
try: | |
chunks_df = pd.read_parquet(chunks_df_cache_path) | |
with open(entities_cache_path, 'rb') as f: | |
all_entities = pickle.load(f) | |
print(f"Кэш чанкинга успешно загружен: {len(chunks_df)} чанков, {len(all_entities)} сущностей.") | |
except Exception as e: | |
print(f"Ошибка загрузки кэша чанкинга: {e}. Выполняем чанкинг заново.") | |
chunks_df = None | |
all_entities = None | |
if chunks_df is None or all_entities is None: | |
print("Кэш чанкинга не найден или поврежден. Выполнение чтения документов и чанкинга...") | |
# 1. Загрузка данных | |
documents_map = read_documents(args.data_folder) | |
if not documents_map: | |
print("Нет документов для обработки. Завершение.") | |
return | |
# 2. Чанкинг | |
chunks_df, all_entities = perform_chunking( | |
documents_map, | |
args.chunking_strategy, # Pass strategy | |
args.process_tables, # Pass table flag | |
args.strategy_params # Pass JSON string parameters | |
) | |
if chunks_df.empty: | |
print("После чанкинга не осталось чанков для обработки. Завершение.") | |
return | |
# Сохраняем результаты чанкинга в кэш | |
try: | |
print(f"Сохранение результатов чанкинга в кэш (hash: {chunking_hash})...") | |
# Убедимся, что директория кэша существует (на всякий случай) | |
chunks_df_cache_path.parent.mkdir(parents=True, exist_ok=True) | |
entities_cache_path.parent.mkdir(parents=True, exist_ok=True) | |
chunks_df.to_parquet(chunks_df_cache_path) | |
with open(entities_cache_path, 'wb') as f: | |
pickle.dump(all_entities, f) | |
print("Результаты чанкинга сохранены в кэш.") | |
except Exception as e: | |
print(f"Ошибка сохранения кэша чанкинга: {e}") | |
# --- Конец Кэширования Чанкинга --- | |
# Загружаем поисковый датасет (это нужно делать всегда, т.к. он не кэшируется здесь) | |
search_df, questions_to_embed = load_datasets(args.search_dataset_path) | |
# 3. Выполнение оценки (передаем загруженные или свежесгенерированные chunks_df и all_entities) | |
results_df = evaluate_run( | |
search_df, questions_to_embed, chunks_df, all_entities, | |
None, None, args # Передаем None для model и tokenizer | |
) | |
# 5. Сохранение результатов | |
if not results_df.empty: | |
os.makedirs(args.output_dir, exist_ok=True) | |
# output_filename = f"results_{args.run_id}.csv" | |
# Добавляем batch_id в имя файла для лучшей группировки | |
output_filename = f"results_{args.batch_id}_{args.run_id}.csv" | |
output_path = os.path.join(args.output_dir, output_filename) | |
try: | |
results_df.to_csv(output_path, index=False, encoding='utf-8') | |
print(f"Детальные результаты сохранены в: {output_path}") | |
except Exception as e: | |
print(f"Ошибка при сохранении результатов в {output_path}: {e}") | |
else: | |
print("Нет результатов для сохранения.") | |
if __name__ == "__main__": | |
main() | |