muryshev's picture
update
fd485d9
raw
history blame
59 kB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Основной пайплайн для оценки качества RAG системы.
Этот скрипт выполняет один прогон оценки для заданных параметров:
- Чтение документов и датасетов вопросов/ответов.
- Чанкинг документов.
- Векторизация вопросов и чанков.
- Оценка релевантности чанков к пунктам из датасета (Chunk-level).
- Сборка контекста из релевантных чанков (Assembly-level).
- Оценка релевантности собранного контекста к эталонным ответам.
- Сохранение детальных метрик для данного прогона.
"""
import argparse
# Add necessary imports for caching
import hashlib
import json
import os
import pickle
import sys
from pathlib import Path
from typing import Any
from uuid import UUID, uuid4
import numpy as np
import pandas as pd
import torch
from fuzzywuzzy import fuzz
from sklearn.metrics.pairwise import cosine_similarity
from tqdm import tqdm
from transformers import AutoModel, AutoTokenizer
# --- Константы (могут быть переопределены аргументами) ---
DEFAULT_DATA_FOLDER = "data/input/docs"
DEFAULT_SEARCH_DATASET_PATH = "data/input/search_dataset_texts.xlsx"
DEFAULT_QA_DATASET_PATH = "data/input/question_answering.xlsx"
DEFAULT_MODEL_NAME = "intfloat/e5-base"
DEFAULT_BATCH_SIZE = 8
DEFAULT_DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu"
DEFAULT_SIMILARITY_THRESHOLD = 0.7
DEFAULT_OUTPUT_DIR = "data/intermediate" # Директория для промежуточных результатов
DEFAULT_WORDS_PER_CHUNK = 50
DEFAULT_OVERLAP_WORDS = 25
DEFAULT_TOP_N = 20 # Значение N по умолчанию для топа чанков
# Add chunking strategy constant
DEFAULT_CHUNKING_STRATEGY = "fixed_size"
# Add cache directory constant
DEFAULT_CACHE_DIR = "data/cache"
# --- Добавление путей к библиотекам ---
# Добавляем путь к корневой папке проекта, чтобы можно было импортировать ntr_...
SCRIPT_DIR = Path(__file__).parent.resolve()
PROJECT_ROOT = SCRIPT_DIR.parent.parent # Перейти на два уровня вверх (scripts/testing -> scripts -> project root)
LIB_EXTRACTOR_PATH = PROJECT_ROOT / "lib" / "extractor"
sys.path.insert(0, str(LIB_EXTRACTOR_PATH))
# Добавляем путь к папке с ntr_text_fragmentation
sys.path.insert(0, str(LIB_EXTRACTOR_PATH / "ntr_text_fragmentation"))
# --- Импорты из локальных модулей ---
try:
from ntr_fileparser import ParsedDocument, UniversalParser
from ntr_text_fragmentation import Destructurer
from ntr_text_fragmentation.core.entity_repository import \
InMemoryEntityRepository
from ntr_text_fragmentation.core.injection_builder import InjectionBuilder
from ntr_text_fragmentation.models.chunk import Chunk
from ntr_text_fragmentation.models.document import DocumentAsEntity
from ntr_text_fragmentation.models.linker_entity import LinkerEntity
except ImportError as e:
print(f"Ошибка импорта локальных модулей: {e}")
print(f"Проверьте пути: Project Root: {PROJECT_ROOT}, Extractor Lib: {LIB_EXTRACTOR_PATH}")
sys.exit(1)
# --- Вспомогательные функции (аналогичные evaluate_chunking.py) ---
def _average_pool(
last_hidden_states: torch.Tensor, attention_mask: torch.Tensor
) -> torch.Tensor:
"""
Расчёт усредненного эмбеддинга по всем токенам.
(Копипаста из evaluate_chunking.py)
"""
last_hidden = last_hidden_states.masked_fill(
~attention_mask[..., None].bool(), 0.0
)
return last_hidden.sum(dim=1) / attention_mask.sum(dim=1)[..., None]
def calculate_chunk_overlap(chunk_text: str, punct_text: str) -> float:
"""
Рассчитывает степень перекрытия между чанком и пунктом.
(Копипаста из evaluate_chunking.py)
"""
if not chunk_text or not punct_text:
return 0.0
# Используем partial_ratio для лучшей обработки подстрок
return fuzz.partial_ratio(chunk_text, punct_text) / 100.0
# --- Функции загрузки и обработки данных ---
def parse_args():
"""Парсит аргументы командной строки."""
parser = argparse.ArgumentParser(description="Пайплайн оценки RAG системы")
# Пути к данным
parser.add_argument("--data-folder", type=str, default=DEFAULT_DATA_FOLDER,
help=f"Папка с документами (по умолчанию: {DEFAULT_DATA_FOLDER})")
parser.add_argument("--search-dataset-path", type=str, default=DEFAULT_SEARCH_DATASET_PATH,
help=f"Путь к датасету для поиска (по умолчанию: {DEFAULT_SEARCH_DATASET_PATH})")
parser.add_argument("--output-dir", type=str, default=DEFAULT_OUTPUT_DIR,
help=f"Папка для сохранения промежуточных результатов (по умолчанию: {DEFAULT_OUTPUT_DIR})")
parser.add_argument("--run-id", type=str, default=f"run_{uuid4()}",
help="Уникальный идентификатор запуска (по умолчанию: генерируется)")
# Параметры модели и векторизации
parser.add_argument("--model-name", type=str, default=DEFAULT_MODEL_NAME,
help=f"Название модели для векторизации (по умолчанию: {DEFAULT_MODEL_NAME})")
parser.add_argument("--batch-size", type=int, default=DEFAULT_BATCH_SIZE,
help=f"Размер батча для векторизации (по умолчанию: {DEFAULT_BATCH_SIZE})")
parser.add_argument("--device", type=str, default=DEFAULT_DEVICE, # type: ignore
help=f"Устройство для вычислений (по умолчанию: {DEFAULT_DEVICE})")
parser.add_argument("--use-sentence-transformers", action="store_true",
help="Использовать библиотеку sentence_transformers")
# Параметры чанкинга
parser.add_argument("--chunking-strategy", type=str, default=DEFAULT_CHUNKING_STRATEGY,
choices=list(Destructurer.STRATEGIES.keys()), # Use keys from Destructurer
help=f"Стратегия чанкинга (по умолчанию: {DEFAULT_CHUNKING_STRATEGY})")
parser.add_argument("--strategy-params", type=str, default='{}', # Default to empty JSON object
help="Параметры для стратегии чанкинга в формате JSON строки (например, '{\"words_per_chunk\": 50}')")
parser.add_argument("--no-process-tables", action="store_false", dest="process_tables",
help="Отключить обработку таблиц при чанкинге")
parser.set_defaults(process_tables=True) # Default is to process tables
# Параметры оценки
parser.add_argument("--similarity-threshold", type=float, default=DEFAULT_SIMILARITY_THRESHOLD,
help=f"Порог для нечеткого сравнения чанка и пункта (по умолчанию: {DEFAULT_SIMILARITY_THRESHOLD})")
parser.add_argument("--top-n", type=int, default=DEFAULT_TOP_N,
help=f"Количество топ-чанков для рассмотрения (по умолчанию: {DEFAULT_TOP_N})")
# Add cache directory argument
parser.add_argument("--cache-dir", type=str, default=DEFAULT_CACHE_DIR,
help=f"Директория для кэширования эмбеддингов и матриц схожести (по умолчанию: {DEFAULT_CACHE_DIR})")
# Параметры сборки контекста
parser.add_argument("--use-injection", action="store_true",
help="Выполнять ли сборку контекста и её оценку")
parser.add_argument("--use-qe", action="store_true",
help="Использовать столбец query_expansion вместо question для поиска (если он есть)")
parser.add_argument("--include-neighbors", action="store_true",
help="Включать ли соседние чанки (предыдущий/следующий) при сборке контекста")
# --- Добавляем аргумент для batch_id ---
parser.add_argument("--batch-id", type=str, default="batch_default",
help="Идентификатор серии запусков (передается из run_pipelines.py)")
# TODO: Добавить другие параметры при необходимости (например, параметры InjectionBuilder)
return parser.parse_args()
def read_documents(folder_path: str) -> dict[str, ParsedDocument]:
"""
Читает все документы из указанной папки и создает сущности.
Args:
folder_path: Путь к папке с документами
Returns:
Словарь {имя_файла: объект ParsedDocument}
"""
print(f"Чтение документов из {folder_path}...")
parser = UniversalParser()
documents_map = {}
doc_files = list(Path(folder_path).glob("*.docx"))
if not doc_files:
print(f"ВНИМАНИЕ: В папке {folder_path} не найдено *.docx файлов.")
return {}
for file_path in tqdm(doc_files, desc="Чтение документов"):
try:
doc_name = file_path.stem
# Парсим документ с помощью UniversalParser
parsed_document = parser.parse_by_path(str(file_path))
# Сохраняем распарсенный документ
documents_map[doc_name] = parsed_document
except Exception as e:
print(f"Ошибка при чтении файла {file_path}: {e}")
print(f"Прочитано документов: {len(documents_map)}")
return documents_map
def load_datasets(search_dataset_path: str) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Загружает датасет для поиска и готовит данные для векторизации.
Args:
search_dataset_path: Путь к Excel с пунктами для поиска.
Returns:
Кортеж: (полный DataFrame поискового датасета, DataFrame с уникальными вопросами для векторизации).
"""
print(f"Загрузка поискового датасета из {search_dataset_path}...")
try:
search_df = pd.read_excel(search_dataset_path)
print(f"Загружен поисковый датасет: {len(search_df)} строк, столбцы: {search_df.columns.tolist()}")
# Проверяем наличие обязательных столбцов
required_columns = ['id', 'question', 'text', 'filename']
missing_cols = [col for col in required_columns if col not in search_df.columns]
if missing_cols:
print(f"Ошибка: В поисковом датасете отсутствуют обязательные столбцы: {missing_cols}")
sys.exit(1)
# Преобразуем NaN в пустые строки для текстовых полей
# Добавляем 'query_expansion', если он есть, для обработки NaN
text_columns = ['question', 'text', 'item_type', 'filename']
if 'query_expansion' in search_df.columns:
text_columns.append('query_expansion')
for col in text_columns:
if col in search_df.columns:
search_df[col] = search_df[col].fillna('')
# Если необязательный item_type отсутствует, добавляем его пустым
elif col == 'item_type':
print(f"Предупреждение: столбец '{col}' отсутствует в поисковом датасете. Добавлен пустой столбец.")
search_df[col] = ''
# Убедимся, что 'id' имеет целочисленный тип
try:
search_df['id'] = search_df['id'].astype(int)
except ValueError as e:
print(f"Ошибка при приведении типов столбца 'id' в поисковом датасете: {e}. Убедитесь, что ID являются целыми числами.")
sys.exit(1)
except FileNotFoundError:
print(f"Ошибка: Поисковый датасет не найден по пути {search_dataset_path}")
sys.exit(1)
except Exception as e:
print(f"Ошибка при чтении поискового датасета: {e}")
sys.exit(1)
# Готовим DataFrame для векторизации уникальных вопросов
# Включаем query_expansion, если он есть
cols_for_embedding = ['id', 'question']
query_expansion_exists = 'query_expansion' in search_df.columns
if query_expansion_exists:
cols_for_embedding.append('query_expansion')
print("Столбец 'query_expansion' найден в поисковом датасете.")
else:
print("Столбец 'query_expansion' не найден в поисковом датасете.")
questions_to_embed = search_df[cols_for_embedding].drop_duplicates(subset=['id']).copy()
# Если query_expansion не существует, добавляем пустой столбец для единообразия
if not query_expansion_exists:
questions_to_embed['query_expansion'] = ''
print(f"Уникальных вопросов для векторизации: {len(questions_to_embed)}")
# Теперь search_df это и есть наш "объединенный" датасет (так как QA не используется)
return search_df, questions_to_embed
def perform_chunking(
documents_map: dict[str, ParsedDocument],
chunking_strategy: str,
process_tables: bool,
strategy_params_json: str # Expect JSON string
) -> tuple[pd.DataFrame, list[LinkerEntity]]:
"""
Выполняет чанкинг для всех документов.
Args:
documents_map: Словарь {имя_файла: сущность_документа}.
chunking_strategy: Имя используемой стратегии чанкинга.
process_tables: Флаг, указывающий, нужно ли обрабатывать таблицы.
strategy_params_json: Строка JSON с параметрами для стратегии.
Returns:
Кортеж: (DataFrame с чанками для поиска, список всех созданных сущностей LinkerEntity)
"""
print("Выполнение чанкинга...")
searchable_chunks_data = [] # Данные только для чанков с in_search_text
final_entities: list[LinkerEntity] = [] # Список для ВСЕХ сущностей (доки, чанки, связи и т.д.)
# Parse strategy parameters from JSON string
try:
chunking_params = json.loads(strategy_params_json)
print(f"Параметры для стратегии '{chunking_strategy}': {chunking_params}")
except json.JSONDecodeError as e:
print(f"Ошибка парсинга JSON для strategy-params: '{strategy_params_json}'. Используются параметры по умолчанию стратегии. Ошибка: {e}")
chunking_params = {} # Use strategy defaults if JSON is invalid
print(f"Используется стратегия чанкинга: '{chunking_strategy}'")
print(f"Обработка таблиц: {'Включена' if process_tables else 'Отключена'}")
for doc_name, parsed_doc in tqdm(documents_map.items(), desc="Чанкинг документов"):
try:
# Инициализируем Destructurer ВНУТРИ цикла для КАЖДОГО документа
destructurer = Destructurer(
document=parsed_doc,
process_tables=process_tables,
strategy_name=chunking_strategy, # Передаем имя стратегии при инициализации
**chunking_params # И параметры стратегии
)
# Destructure создает DocumentAsEntity, чанки, связи и возвращает их как LinkerEntity
new_entities = destructurer.destructure()
# Добавляем ВСЕ созданные сущности (сериализованные LinkerEntity) в общий список
final_entities.extend(new_entities)
# Собираем данные для DataFrame только из тех сущностей,
# у которых есть поле in_search_text (это наши чанки для поиска)
for entity in new_entities:
# Проверяем наличие атрибута 'in_search_text', а не тип
if hasattr(entity, 'in_search_text') and entity.in_search_text:
entity_data = {
'chunk_id': str(entity.id),
'doc_name': doc_name, # Имя исходного файла
'doc_id': str(entity.source_id), # ID сущности документа (DocumentAsEntity)
'text': entity.in_search_text, # Текст для векторизации и поиска
'type': entity.type, # Тип сущности (например, 'FixedSizeChunk')
'strategy_params': json.dumps(chunking_params, ensure_ascii=False),
}
searchable_chunks_data.append(entity_data)
except Exception as e:
# Логируем ошибку и продолжаем с остальными документами
print(f"\nОшибка при чанкинге документа {doc_name}: {e}")
import traceback
traceback.print_exc() # Печатаем traceback для детальной отладки
# Создаем DataFrame только из чанков, предназначенных для поиска
chunks_df = pd.DataFrame(searchable_chunks_data)
print(f"Создано чанков для поиска: {len(chunks_df)}")
# Возвращаем DataFrame с чанками для поиска и ПОЛНЫЙ список всех LinkerEntity
return chunks_df, final_entities
def setup_model_and_tokenizer(model_name: str, use_sentence_transformers: bool, device: str):
"""Инициализирует модель и токенизатор."""
print(f"Загрузка модели {model_name} на устройство {device}...")
if use_sentence_transformers:
try:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer(model_name, device=device)
tokenizer = None # sentence_transformers не требует отдельного токенизатора
print("Используется SentenceTransformer.")
return model, tokenizer
except ImportError:
print("Ошибка: Библиотека sentence_transformers не установлена. Установите: pip install sentence-transformers")
sys.exit(1)
else:
try:
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name).to(device)
model.eval()
print("Используется AutoModel и AutoTokenizer из transformers.")
return model, tokenizer
except Exception as e:
print(f"Ошибка при загрузке модели {model_name} из transformers: {e}")
sys.exit(1)
def get_embeddings(
texts: list[str],
model,
tokenizer,
batch_size: int,
use_sentence_transformers: bool,
device: str
) -> np.ndarray:
"""Получает эмбеддинги для списка текстов."""
all_embeddings = []
desc = "Векторизация (Sentence Transformers)" if use_sentence_transformers else "Векторизация (Transformers)"
for i in tqdm(range(0, len(texts), batch_size), desc=desc):
batch_texts = texts[i:i+batch_size]
if not batch_texts:
continue
if use_sentence_transformers:
# Эмбеддинги через sentence_transformers
embeddings = model.encode(batch_texts, batch_size=len(batch_texts), show_progress_bar=False)
all_embeddings.append(embeddings)
else:
# Эмбеддинги через transformers с average pooling
try:
encoding = tokenizer(
batch_texts,
padding=True,
truncation=True,
max_length=512, # Стандартное ограничение для многих моделей
return_tensors="pt"
).to(device)
with torch.no_grad():
outputs = model(**encoding)
embeddings = _average_pool(outputs.last_hidden_state, encoding["attention_mask"])
all_embeddings.append(embeddings.cpu().numpy())
except Exception as e:
print(f"Ошибка при векторизации батча (индексы {i} - {i+batch_size}): {e}")
print(f"Тексты батча: {batch_texts[:2]}...")
# Добавляем нулевые векторы, чтобы не сломать vstack
# Определяем размер эмбеддинга
if all_embeddings:
embedding_dim = all_embeddings[0].shape[1]
else:
# Пытаемся получить размер из конфигурации модели
try:
embedding_dim = model.config.hidden_size
except AttributeError:
embedding_dim = 768 # Запасной вариант
print(f"Не удалось определить размер эмбеддинга, используется {embedding_dim}")
print(f"Добавление нулевых эмбеддингов размерности ({len(batch_texts)}, {embedding_dim})")
null_embeddings = np.zeros((len(batch_texts), embedding_dim), dtype=np.float32)
all_embeddings.append(null_embeddings)
if not all_embeddings:
print("ВНИМАНИЕ: Не удалось создать эмбеддинги.")
# Возвращаем пустой массив правильной формы, если возможно
try:
embedding_dim = model.config.hidden_size if not use_sentence_transformers else model.get_sentence_embedding_dimension()
except:
embedding_dim = 768
return np.empty((0, embedding_dim), dtype=np.float32)
# Объединяем эмбеддинги из всех батчей
try:
final_embeddings = np.vstack(all_embeddings)
except ValueError as e:
print(f"Ошибка при объединении эмбеддингов: {e}")
print("Размеры эмбеддингов в батчах:")
for i, emb_batch in enumerate(all_embeddings):
print(f" Батч {i}: {emb_batch.shape}")
# Попробуем определить ожидаемый размер и создать нулевой массив
if all_embeddings:
embedding_dim = all_embeddings[0].shape[1]
print(f"Возвращение нулевого массива размерности ({len(texts)}, {embedding_dim})")
return np.zeros((len(texts), embedding_dim), dtype=np.float32)
else:
return np.empty((0, 768), dtype=np.float32) # Запасной вариант
print(f"Получено эмбеддингов: {final_embeddings.shape}")
return final_embeddings
# --- Caching Helper Functions ---
def _get_params_hash(
model_name: str,
process_tables: bool | None = None,
strategy_params: dict | None = None # Expect the parsed dictionary
) -> str:
"""Создает MD5 хэш из переданных параметров."""
hasher = hashlib.md5()
hasher.update(model_name.encode())
# Add chunking strategy and table processing flag if provided
if process_tables is not None:
hasher.update(str(process_tables).encode())
# Add strategy parameters (sort items to ensure consistent hash)
if strategy_params:
sorted_params = sorted(strategy_params.items())
hasher.update(json.dumps(sorted_params).encode())
return hasher.hexdigest()
def _get_cache_path(cache_dir: Path, hash_str: str, filename: str) -> Path:
"""Формирует путь к файлу кэша, создавая поддиректории."""
# Используем первые 2 символа хэша для распределения по поддиректориям
# Это помогает избежать слишком большого количества файлов в одной директории
cache_subdir = cache_dir / hash_str[:2] / hash_str
cache_subdir.mkdir(parents=True, exist_ok=True)
return cache_subdir / filename
# --- Добавляем функцию для хэша чанкинга ---
def _get_chunking_cache_hash(
data_folder: str,
chunking_strategy: str,
process_tables: bool,
strategy_params: dict # Ожидаем словарь
) -> str:
"""Создает MD5 хэш для параметров чанкинга и папки с данными."""
hasher = hashlib.md5()
hasher.update(data_folder.encode())
hasher.update(chunking_strategy.encode())
hasher.update(str(process_tables).encode())
# Сортируем параметры для консистентности хэша
sorted_params = sorted(strategy_params.items())
hasher.update(json.dumps(sorted_params).encode())
return hasher.hexdigest()
# ---------------------------------------------
# --- Main Evaluation Function ---
def evaluate_run(
search_dataset: pd.DataFrame,
questions_to_embed: pd.DataFrame,
chunks_df: pd.DataFrame,
all_entities: list[LinkerEntity],
model: Any | None, # Принимаем None
tokenizer: Any | None, # Принимаем None
args: argparse.Namespace
) -> pd.DataFrame:
"""
Выполняет основной цикл оценки для одного набора параметров.
Args:
search_dataset: DataFrame поискового датасета.
questions_to_embed: DataFrame с уникальными вопросами для векторизации.
chunks_df: DataFrame с данными по чанкам.
all_entities: Список всех сущностей (документы, чанки, связи).
model: Модель для векторизации.
tokenizer: Токенизатор.
args: Аргументы командной строки.
Returns:
DataFrame с детальными метриками по каждому вопросу для этого запуска.
"""
print("Начало этапа оценки...")
# Переменные для модели и токенизатора, инициализируем None
loaded_model: Any | None = model
loaded_tokenizer: Any | None = tokenizer
# --- Caching Setup ---
print("Настройка кэширования...")
CACHE_DIR_PATH = Path(args.cache_dir)
model_slug = args.model_name.split('/')[-1] # Basic slug for filename clarity
# --- Определяем, какой текст использовать для эмбеддингов вопросов ---
# и устанавливаем флаг qe_active, который будет влиять на кэш
if args.use_qe and 'query_expansion' in questions_to_embed.columns and questions_to_embed['query_expansion'].notna().any(): # Check if column exists and has non-NA values
print("Используется Query Expansion (столбец 'query_expansion') для векторизации вопросов.")
query_texts_to_embed = questions_to_embed['query_expansion'].tolist()
qe_active = True
else:
print("Используется оригинальный текст вопроса (столбец 'question') для векторизации.")
query_texts_to_embed = questions_to_embed['question'].tolist()
qe_active = False
# Cache key for question embeddings (ЗАВИСИТ от модели и флага use_qe)
question_params_for_hash = {
'model_name': args.model_name,
'use_qe': qe_active # Добавляем фактическое использование QE в параметры для хэша
}
question_hash = hashlib.md5(json.dumps(question_params_for_hash, sort_keys=True).encode()).hexdigest()
question_embeddings_cache_path = _get_cache_path(
CACHE_DIR_PATH, question_hash, f"q_embeddings_{model_slug}_qe{qe_active}.npy"
)
# Cache key for chunk embeddings (depends on model and chunking)
chunk_hash = _get_params_hash(
args.model_name,
args.process_tables, # Include table flag
json.loads(args.strategy_params) # Pass parsed params dictionary
)
chunk_embeddings_cache_path = _get_cache_path(
CACHE_DIR_PATH, chunk_hash,
f"c_emb_{model_slug}_s-{args.chunking_strategy}_t{args.process_tables}_ph-{hashlib.md5(args.strategy_params.encode()).hexdigest()[:8]}.npy"
)
# Cache key for similarity matrix (depends on both sets of embeddings)
similarity_hash = f"{question_hash}_{chunk_hash}" # Combine hashes
similarity_cache_path = _get_cache_path(
CACHE_DIR_PATH, similarity_hash,
f"sim_{model_slug}_qe{qe_active}_ph-{hashlib.md5(args.strategy_params.encode()).hexdigest()[:8]}.npy" # Добавляем флаг QE в имя файла
)
# 1. Векторизация вопросов и чанков (с кэшем)
question_embeddings = None
needs_model_load = False # Флаг, указывающий, нужна ли загрузка модели
if question_embeddings_cache_path.exists():
try:
print(f"Загрузка кэшированных эмбеддингов вопросов из: {question_embeddings_cache_path}")
question_embeddings = np.load(question_embeddings_cache_path, allow_pickle=False)
if len(question_embeddings) != len(questions_to_embed):
print(f"Предупреждение: Размер кэша эмбеддингов вопросов не совпадает. Пересчет.")
question_embeddings = None
else:
print("Кэш эмбеддингов вопросов успешно загружен.")
except Exception as e:
print(f"Ошибка загрузки кэша эмбеддингов вопросов: {e}. Пересчет.")
question_embeddings = None
if question_embeddings is None:
needs_model_load = True # Требуется модель для генерации эмбеддингов
print("Векторизация вопросов (потребуется загрузка модели)...")
chunk_embeddings = None
if chunk_embeddings_cache_path.exists():
try:
print(f"Загрузка кэшированных эмбеддингов чанков из: {chunk_embeddings_cache_path}")
chunk_embeddings = np.load(chunk_embeddings_cache_path, allow_pickle=False)
if len(chunk_embeddings) != len(chunks_df):
print(f"Предупреждение: Размер кэша эмбеддингов чанков не совпадает. Пересчет.")
chunk_embeddings = None
else:
print("Кэш эмбеддингов чанков успешно загружен.")
except Exception as e:
print(f"Ошибка загрузки кэша эмбеддингов чанков: {e}. Пересчет.")
chunk_embeddings = None
if chunk_embeddings is None:
needs_model_load = True # Требуется модель для генерации эмбеддингов
print("Векторизация чанков (потребуется загрузка модели)...")
# --- Отложенная загрузка модели, если необходимо ---
if needs_model_load and loaded_model is None:
print("\n--- Загрузка модели и токенизатора (т.к. кэш эмбеддингов отсутствует) ---")
loaded_model, loaded_tokenizer = setup_model_and_tokenizer(
args.model_name, args.use_sentence_transformers, args.device
)
print("--- Модель и токенизатор загружены ---\n")
# --- Повторная генерация эмбеддингов, если они не загрузились из кэша ---
if question_embeddings is None:
if loaded_model is None:
print("Критическая ошибка: Модель не загружена, но требуется для векторизации вопросов!")
# Возвращаем пустой DataFrame или выбрасываем исключение
return pd.DataFrame()
print("Повторная векторизация вопросов...")
question_embeddings = get_embeddings(
query_texts_to_embed,
loaded_model, loaded_tokenizer, args.batch_size, args.use_sentence_transformers, args.device
)
if question_embeddings.shape[0] > 0:
try:
print(f"Сохранение эмбеддингов вопросов в кэш: {question_embeddings_cache_path}")
np.save(question_embeddings_cache_path, question_embeddings, allow_pickle=False)
except Exception as e:
print(f"Не удалось сохранить кэш эмбеддингов вопросов: {e}")
if chunk_embeddings is None:
if loaded_model is None:
print("Критическая ошибка: Модель не загружена, но требуется для векторизации чанков!")
return pd.DataFrame()
print("Повторная векторизация чанков...")
chunk_texts = chunks_df['text'].fillna('').astype(str).tolist()
chunk_embeddings = get_embeddings(
chunk_texts,
loaded_model, loaded_tokenizer, args.batch_size, args.use_sentence_transformers, args.device
)
if chunk_embeddings.shape[0] > 0:
try:
print(f"Сохранение эмбеддингов чанков в кэш: {chunk_embeddings_cache_path}")
np.save(chunk_embeddings_cache_path, chunk_embeddings, allow_pickle=False)
except Exception as e:
print(f"Не удалось сохранить кэш эмбеддингов чанков: {e}")
# Проверка совпадения количества эмбеддингов и данных
if len(question_embeddings) != len(questions_to_embed):
print(f"Ошибка: Количество эмбеддингов вопросов ({len(question_embeddings)}) не совпадает с количеством уникальных вопросов ({len(questions_to_embed)}).")
# Можно либо прервать выполнение, либо попытаться исправить
# Например, взять первые N эмбеддингов, но это может быть некорректно
sys.exit(1)
if len(chunk_embeddings) != len(chunks_df):
print(f"Ошибка: Количество эмбеддингов чанков ({len(chunk_embeddings)}) не совпадает с количеством чанков в DataFrame ({len(chunks_df)}).")
# Попытка исправить (если ошибка небольшая) или выход
if abs(len(chunk_embeddings) - len(chunks_df)) < 5:
print("Попытка обрезать лишние эмбеддинги/данные...")
min_len = min(len(chunk_embeddings), len(chunks_df))
chunk_embeddings = chunk_embeddings[:min_len]
chunks_df = chunks_df.iloc[:min_len]
print(f"Размеры выровнены до {min_len}")
else:
sys.exit(1)
# Создаем маппинг ID вопроса к индексу в эмбеддингах
question_id_to_idx = {
row['id']: i for i, (_, row) in enumerate(questions_to_embed.iterrows())
}
# 2. Расчет косинусной близости
print("Расчет косинусной близости...")
# Проверка на пустые эмбеддинги
if question_embeddings.shape[0] == 0 or chunk_embeddings.shape[0] == 0:
print("Ошибка: Отсутствуют эмбеддинги вопросов или чанков для расчета близости.")
# Возвращаем пустой DataFrame или обрабатываем ошибку иначе
return pd.DataFrame()
similarity_matrix = cosine_similarity(question_embeddings, chunk_embeddings)
# 3. Инициализация InjectionBuilder (если нужно)
injection_builder = None
if args.use_injection:
print("Инициализация InjectionBuilder...")
repository = InMemoryEntityRepository(all_entities)
injection_builder = InjectionBuilder(repository)
# TODO: Зарегистрировать стратегии, если необходимо
# builder.register_strategy(...)
# 4. Цикл по уникальным вопросам для оценки
results = []
print(f"Оценка для {len(questions_to_embed)} уникальных вопросов...")
for question_id_iter, question_data in tqdm(questions_to_embed.iterrows(), total=len(questions_to_embed), desc="Оценка вопросов"): # Renamed loop variable
q_id = question_data['id']
q_text = question_data['question']
# Получаем все строки из исходного датасета для этого вопроса
question_rows = search_dataset[search_dataset['id'] == q_id] # Use search_dataset
if question_rows.empty:
print(f"Предупреждение: Нет данных в search_dataset для вопроса ID={q_id}")
continue
# Получаем пункты (relevant items)
puncts = question_rows['text'].tolist()
# reference_answer больше не используется и не извлекается
# Получаем индекс вопроса в матрице близости
if q_id not in question_id_to_idx:
print(f"Предупреждение: Вопрос ID={q_id} не найден в маппинге эмбеддингов.")
continue
question_idx = question_id_to_idx[q_id]
# --- Оценка на уровне чанков (Chunk-level) ---
chunk_level_metrics = evaluate_chunk_relevance(
q_id, question_idx, puncts,
similarity_matrix, chunks_df, args.top_n, args.similarity_threshold
)
# --- Оценка на уровне сборки (Assembly-level) ---
# Удаляем assembly_relevance, основанный на reference_answer
assembly_level_metrics = {} # Start with an empty dict for assembly metrics
assembled_context = ""
top_chunk_indices = chunk_level_metrics.get("top_chunk_ids", []) # Get indices first
neighbors_included = False # Flag to log
if args.use_injection and injection_builder and top_chunk_indices:
try:
# Преобразуем ID строк обратно в UUID чанков
top_chunk_uuids = [UUID(chunks_df.iloc[idx]['chunk_id']) for idx in top_chunk_indices]
final_chunk_uuids_for_assembly = set(top_chunk_uuids) # Start with top chunks
# --- Добавляем соседей, если нужно ---
if args.include_neighbors:
neighbors_included = True
# --- Убираем логирование индексов ---
neighbor_chunks = repository.get_neighboring_chunks(chunk_ids=top_chunk_uuids, max_distance=1)
neighbor_ids = {neighbor.id for neighbor in neighbor_chunks}
# --- Логирование до/после добавления ID соседей ---
print(f" [DEBUG QID {q_id}] Кол-во ID до добавления соседей: {len(final_chunk_uuids_for_assembly)}")
print(f" [DEBUG QID {q_id}] Кол-во найденных ID соседей: {len(neighbor_ids)}")
final_chunk_uuids_for_assembly.update(neighbor_ids)
print(f" [DEBUG QID {q_id}] Кол-во ID после добавления соседей: {len(final_chunk_uuids_for_assembly)}")
# --- Конец логирования ---
# --- Убираем логирование индексов ---
else:
# --- Убираем логирование индексов ---
pass # Ничего не делаем, если соседи не включены
# Собираем контекст
# Передаем финальный набор UUID (уникальный)
assembled_context = injection_builder.build(
filtered_entities=list(final_chunk_uuids_for_assembly),
# chunk_scores= {chunks_df.loc[idx, 'chunk_id']: sim for idx, sim in zip(top_chunk_ids_for_assembly, chunk_level_metrics.get('top_chunk_similarities',[]))} # Можно добавить веса
)
# --- Новая метрика: Assembly Punct Recall ---
# Оцениваем, сколько пунктов из датасета найдено в собранном контексте
# (по вашей идее: пункт считается найденным, если хотя бы одна его часть,
# разделенная переносом строки, найдена в контексте)
assembly_found_puncts = 0
assembly_total_puncts = len(puncts)
if assembly_total_puncts > 0 and assembled_context:
# Итерируемся по каждому исходному пункту
for punct_text in puncts:
# Разбиваем пункт на части по переносу строки
# Убираем пустые строки, которые могут появиться из-за двойных переносов
punct_parts = [part for part in punct_text.split('\n') if part.strip()]
# Если пункт пустой или состоит только из пробельных символов после разбивки,
# пропускаем его (не считаем ни найденным, ни не найденным в контексте recall)
if not punct_parts:
assembly_total_puncts -= 1 # Уменьшаем общее число пунктов для расчета recall
continue
is_punct_found = False
# Итерируемся по частям пункта
for part_text in punct_parts:
# Сравниваем КАЖДУЮ ЧАСТЬ пункта с собранным контекстом
if calculate_chunk_overlap(assembled_context, part_text.strip()) >= args.similarity_threshold:
# Если ХОТЯ БЫ ОДНА часть найдена, считаем ВЕСЬ пункт найденным
is_punct_found = True
break # Дальше части этого пункта можно не проверять
# Если флаг is_punct_found стал True, увеличиваем счетчик найденных пунктов
if is_punct_found:
assembly_found_puncts += 1
# Рассчитываем recall, только если были валидные пункты для проверки
if assembly_total_puncts > 0:
assembly_level_metrics["assembly_punct_recall"] = assembly_found_puncts / assembly_total_puncts
else:
assembly_level_metrics["assembly_punct_recall"] = 0.0 # Или можно None, если нет валидных пунктов
else:
assembly_level_metrics["assembly_punct_recall"] = 0.0
# Добавляем сам текст сборки для возможного анализа (усеченный)
assembly_level_metrics["assembled_context_preview"] = assembled_context[:500] + ("..." if len(assembled_context) > 500 else "")
except Exception as e:
print(f"Ошибка при сборке/оценке контекста для вопроса ID={q_id}: {e}")
# Записываем None или 0, чтобы не прерывать процесс
assembly_level_metrics["assembly_punct_recall"] = None # Indicate error
assembly_level_metrics["assembled_context_preview"] = f"Error during assembly: {e}"
# Собираем все метрики для вопроса
question_result = {
"run_id": args.run_id,
"batch_id": args.batch_id, # --- Добавляем batch_id в результаты ---
"question_id": q_id,
"question_text": q_text,
# Параметры запуска
"model_name": args.model_name,
"chunking_strategy": args.chunking_strategy, # Log strategy
"process_tables": args.process_tables, # Log table flag
"strategy_params": args.strategy_params, # Log JSON string
"top_n": args.top_n,
"use_injection": args.use_injection,
"use_qe": qe_active, # Log QE status
"neighbors_included": neighbors_included, # Log neighbor flag
"similarity_threshold": args.similarity_threshold,
# Метрики Chunk-level
**chunk_level_metrics,
# Метрики Assembly-level (теперь с recall по пунктам)
**assembly_level_metrics,
# Тексты для отладки (эталонный ответ удален, сборка добавлена выше)
# "assembled_context": assembled_context[:500] + "..." if assembled_context else "",
}
results.append(question_result)
print("Оценка завершена.")
return pd.DataFrame(results)
def evaluate_chunk_relevance(
question_id: int,
question_idx: int,
puncts: list[str],
similarity_matrix: np.ndarray,
chunks_df: pd.DataFrame,
top_n: int,
similarity_threshold: float
) -> dict:
"""
Оценивает релевантность чанков для одного вопроса.
(Адаптировано из evaluate_for_top_n_with_mapping в evaluate_chunking.py)
Возвращает словарь с метриками для этого вопроса.
"""
metrics = {
"chunk_text_precision": 0.0,
"chunk_text_recall": 0.0,
"chunk_text_f1": 0.0,
"found_puncts": 0,
"total_puncts": len(puncts),
"relevant_chunks": 0,
"total_chunks_in_top_n": 0,
"top_chunk_ids": [], # Индексы строк в chunks_df
"top_chunk_similarities": [],
}
if chunks_df.empty or similarity_matrix.shape[1] == 0:
print(f"Предупреждение (QID {question_id}): Нет чанков для оценки.")
return metrics
# Получаем схожести всех чанков с текущим вопросом
question_similarities = similarity_matrix[question_idx, :]
# Сортируем чанки по схожести и берем top_n
# argsort возвращает индексы элементов, которые бы отсортировали массив
# Берем последние N индексов (-top_n:) и разворачиваем ([::-1]) для убывания
# Добавляем проверку на случай если top_n > количества чанков
if top_n >= similarity_matrix.shape[1]:
sorted_chunk_indices = np.argsort(question_similarities)[::-1] # Берем все, сортируем по убыванию
else:
sorted_chunk_indices = np.argsort(question_similarities)[-top_n:][::-1]
# Ограничиваем top_n, если чанков меньше (это должно быть сделано выше, но дублируем для надежности)
actual_top_n = min(top_n, len(sorted_chunk_indices))
top_chunk_indices = sorted_chunk_indices[:actual_top_n]
# Сохраняем ID и схожести топ-чанков
metrics["top_chunk_ids"] = top_chunk_indices.tolist()
metrics["top_chunk_similarities"] = question_similarities[top_chunk_indices].tolist()
# Отбираем данные топ-чанков
top_chunks_df = chunks_df.iloc[top_chunk_indices]
metrics["total_chunks_in_top_n"] = len(top_chunks_df)
if metrics["total_chunks_in_top_n"] == 0:
return metrics # Если нет топ-чанков, метрики остаются нулевыми
# Оценка на основе текста (пунктов)
punct_found = [False] * metrics["total_puncts"]
question_relevant_chunks = 0
for i, (idx, chunk_row) in enumerate(top_chunks_df.iterrows()):
chunk_text = chunk_row['text']
is_relevant_to_punct = False
for j, punct_text in enumerate(puncts):
overlap = calculate_chunk_overlap(chunk_text, punct_text)
if overlap >= similarity_threshold:
is_relevant_to_punct = True
punct_found[j] = True
if is_relevant_to_punct:
question_relevant_chunks += 1
metrics["found_puncts"] = sum(punct_found)
metrics["relevant_chunks"] = question_relevant_chunks
if metrics["total_chunks_in_top_n"] > 0:
metrics["chunk_text_precision"] = metrics["relevant_chunks"] / metrics["total_chunks_in_top_n"]
if metrics["total_puncts"] > 0:
metrics["chunk_text_recall"] = metrics["found_puncts"] / metrics["total_puncts"]
if metrics["chunk_text_precision"] + metrics["chunk_text_recall"] > 0:
metrics["chunk_text_f1"] = (2 * metrics["chunk_text_precision"] * metrics["chunk_text_recall"] /
(metrics["chunk_text_precision"] + metrics["chunk_text_recall"]))
return metrics
# --- Основная функция ---
def main():
"""Основная функция скрипта."""
args = parse_args()
print(f"Запуск оценки с ID: {args.run_id}")
print(f"Параметры: {vars(args)}")
# --- Кэширование Чанкинга ---
CACHE_DIR_PATH = Path(args.cache_dir)
try:
# Парсим параметры стратегии один раз
parsed_strategy_params = json.loads(args.strategy_params)
except json.JSONDecodeError:
print(f"Предупреждение: Невалидный JSON в strategy_params: '{args.strategy_params}'. Используются параметры по умолчанию для хэша кэша.")
parsed_strategy_params = {}
chunking_hash = _get_chunking_cache_hash(
args.data_folder,
args.chunking_strategy,
args.process_tables,
parsed_strategy_params
)
chunks_df_cache_path = _get_cache_path(CACHE_DIR_PATH, chunking_hash, "chunks_df.parquet")
entities_cache_path = _get_cache_path(CACHE_DIR_PATH, chunking_hash, "final_entities.pkl")
chunks_df = None
all_entities = None
if chunks_df_cache_path.exists() and entities_cache_path.exists():
print(f"Найден кэш чанкинга (hash: {chunking_hash}). Загрузка...")
try:
chunks_df = pd.read_parquet(chunks_df_cache_path)
with open(entities_cache_path, 'rb') as f:
all_entities = pickle.load(f)
print(f"Кэш чанкинга успешно загружен: {len(chunks_df)} чанков, {len(all_entities)} сущностей.")
except Exception as e:
print(f"Ошибка загрузки кэша чанкинга: {e}. Выполняем чанкинг заново.")
chunks_df = None
all_entities = None
if chunks_df is None or all_entities is None:
print("Кэш чанкинга не найден или поврежден. Выполнение чтения документов и чанкинга...")
# 1. Загрузка данных
documents_map = read_documents(args.data_folder)
if not documents_map:
print("Нет документов для обработки. Завершение.")
return
# 2. Чанкинг
chunks_df, all_entities = perform_chunking(
documents_map,
args.chunking_strategy, # Pass strategy
args.process_tables, # Pass table flag
args.strategy_params # Pass JSON string parameters
)
if chunks_df.empty:
print("После чанкинга не осталось чанков для обработки. Завершение.")
return
# Сохраняем результаты чанкинга в кэш
try:
print(f"Сохранение результатов чанкинга в кэш (hash: {chunking_hash})...")
# Убедимся, что директория кэша существует (на всякий случай)
chunks_df_cache_path.parent.mkdir(parents=True, exist_ok=True)
entities_cache_path.parent.mkdir(parents=True, exist_ok=True)
chunks_df.to_parquet(chunks_df_cache_path)
with open(entities_cache_path, 'wb') as f:
pickle.dump(all_entities, f)
print("Результаты чанкинга сохранены в кэш.")
except Exception as e:
print(f"Ошибка сохранения кэша чанкинга: {e}")
# --- Конец Кэширования Чанкинга ---
# Загружаем поисковый датасет (это нужно делать всегда, т.к. он не кэшируется здесь)
search_df, questions_to_embed = load_datasets(args.search_dataset_path)
# 3. Выполнение оценки (передаем загруженные или свежесгенерированные chunks_df и all_entities)
results_df = evaluate_run(
search_df, questions_to_embed, chunks_df, all_entities,
None, None, args # Передаем None для model и tokenizer
)
# 5. Сохранение результатов
if not results_df.empty:
os.makedirs(args.output_dir, exist_ok=True)
# output_filename = f"results_{args.run_id}.csv"
# Добавляем batch_id в имя файла для лучшей группировки
output_filename = f"results_{args.batch_id}_{args.run_id}.csv"
output_path = os.path.join(args.output_dir, output_filename)
try:
results_df.to_csv(output_path, index=False, encoding='utf-8')
print(f"Детальные результаты сохранены в: {output_path}")
except Exception as e:
print(f"Ошибка при сохранении результатов в {output_path}: {e}")
else:
print("Нет результатов для сохранения.")
if __name__ == "__main__":
main()