generic-chatbot-backend / scripts /testing /aggregate_results.py
muryshev's picture
update
fd485d9
raw
history blame
29.7 kB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Скрипт для агрегации и анализа результатов множества запусков pipeline.py.
Читает все CSV-файлы из директории промежуточных результатов,
объединяет их и вычисляет агрегированные метрики:
- Weighted (усредненные по всем вопросам, взвешенные по количеству пунктов/чанков/документов)
- Macro (усредненные по вопросам - сначала считаем метрику для каждого вопроса, потом усредняем)
- Micro (считаем общие TP, FP, FN по всем вопросам, потом вычисляем метрики)
Результаты сохраняются в один Excel-файл с несколькими листами.
"""
import argparse
import glob
# Импорт для обработки JSON строк
import os
import pandas as pd
from openpyxl import Workbook
from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
from openpyxl.utils import get_column_letter
from openpyxl.utils.dataframe import dataframe_to_rows
# Прогресс-бар
from tqdm import tqdm
# --- Настройки ---
DEFAULT_INTERMEDIATE_DIR = "data/intermediate" # Откуда читать CSV
DEFAULT_OUTPUT_DIR = "data/output" # Куда сохранять итоговый Excel
DEFAULT_OUTPUT_FILENAME = "aggregated_results.xlsx"
# --- Маппинг названий столбцов на русский язык ---
COLUMN_NAME_MAPPING = {
# Параметры запуска из pipeline.py
'run_id': 'ID Запуска',
'model_name': 'Модель',
'chunking_strategy': 'Стратегия Чанкинга',
'strategy_params': 'Параметры Стратегии',
'process_tables': 'Обраб. Таблиц',
'top_n': 'Top N',
'use_injection': 'Сборка Контекста',
'use_qe': 'Query Expansion',
'neighbors_included': 'Вкл. Соседей',
'similarity_threshold': 'Порог Схожести',
# Идентификаторы из датасета (для детальных результатов)
'question_id': 'ID Вопроса',
'question_text': 'Текст Вопроса',
# Детальные метрики из pipeline.py
'chunk_text_precision': 'Точность (Чанк-Текст)',
'chunk_text_recall': 'Полнота (Чанк-Текст)',
'chunk_text_f1': 'F1 (Чанк-Текст)',
'found_puncts': 'Найдено Пунктов',
'total_puncts': 'Всего Пунктов',
'relevant_chunks': 'Релевантных Чанков',
'total_chunks_in_top_n': 'Всего Чанков в Топ-N',
'assembly_punct_recall': 'Полнота (Сборка-Пункт)',
'assembled_context_preview': 'Предпросмотр Сборки',
# 'top_chunk_ids': 'Индексы Топ-Чанков', # Списки, могут плохо отображаться
# 'top_chunk_similarities': 'Схожести Топ-Чанков', # Списки
# Агрегированные метрики (добавляются в calculate_aggregated_metrics)
'weighted_chunk_text_precision': 'Weighted Точность (Чанк-Текст)',
'weighted_chunk_text_recall': 'Weighted Полнота (Чанк-Текст)',
'weighted_chunk_text_f1': 'Weighted F1 (Чанк-Текст)',
'weighted_assembly_punct_recall': 'Weighted Полнота (Сборка-Пункт)',
'macro_chunk_text_precision': 'Macro Точность (Чанк-Текст)',
'macro_chunk_text_recall': 'Macro Полнота (Чанк-Текст)',
'macro_chunk_text_f1': 'Macro F1 (Чанк-Текст)',
'macro_assembly_punct_recall': 'Macro Полнота (Сборка-Пункт)',
'micro_text_precision': 'Micro Точность (Текст)',
'micro_text_recall': 'Micro Полнота (Текст)',
'micro_text_f1': 'Micro F1 (Текст)',
}
def parse_args():
"""Парсит аргументы командной строки."""
parser = argparse.ArgumentParser(description="Агрегация результатов оценочных пайплайнов")
parser.add_argument("--intermediate-dir", type=str, default=DEFAULT_INTERMEDIATE_DIR,
help=f"Директория с промежуточными CSV результатами (по умолчанию: {DEFAULT_INTERMEDIATE_DIR})")
parser.add_argument("--output-dir", type=str, default=DEFAULT_OUTPUT_DIR,
help=f"Директория для сохранения итогового Excel файла (по умолчанию: {DEFAULT_OUTPUT_DIR})")
parser.add_argument("--output-filename", type=str, default=DEFAULT_OUTPUT_FILENAME,
help=f"Имя выходного Excel файла (по умолчанию: {DEFAULT_OUTPUT_FILENAME})")
parser.add_argument("--latest-batch-only", action="store_true",
help="Агрегировать результаты только для последнего batch_id")
return parser.parse_args()
def load_intermediate_results(intermediate_dir: str) -> pd.DataFrame:
"""Загружает все CSV файлы из указанной директории."""
print(f"Загрузка промежуточных результатов из: {intermediate_dir}")
csv_files = glob.glob(os.path.join(intermediate_dir, "results_*.csv"))
if not csv_files:
print(f"ВНИМАНИЕ: В директории {intermediate_dir} не найдено файлов 'results_*.csv'.")
return pd.DataFrame()
all_data = []
for f in csv_files:
try:
df = pd.read_csv(f)
all_data.append(df)
print(f" Загружен файл: {os.path.basename(f)} ({len(df)} строк)")
except Exception as e:
print(f"Ошибка при чтении файла {f}: {e}")
if not all_data:
print("Не удалось загрузить ни одного файла с результатами.")
return pd.DataFrame()
combined_df = pd.concat(all_data, ignore_index=True)
print(f"Всего загружено строк: {len(combined_df)}")
print(f"Найденные колонки: {combined_df.columns.tolist()}")
# Преобразуем типы данных для надежности
numeric_cols = [
'chunk_text_precision', 'chunk_text_recall', 'chunk_text_f1',
'found_puncts', 'total_puncts', 'relevant_chunks',
'total_chunks_in_top_n',
'assembly_punct_recall',
'similarity_threshold', 'top_n',
]
for col in numeric_cols:
if col in combined_df.columns:
combined_df[col] = pd.to_numeric(combined_df[col], errors='coerce')
boolean_cols = [
'use_injection',
'process_tables',
'use_qe',
'neighbors_included'
]
for col in boolean_cols:
if col in combined_df.columns:
# Пытаемся конвертировать в bool, обрабатывая строки 'True'/'False'
if combined_df[col].dtype == 'object':
combined_df[col] = combined_df[col].astype(str).str.lower().map({'true': True, 'false': False}).fillna(False)
combined_df[col] = combined_df[col].astype(bool)
# Заполним пропуски в числовых колонках нулями (например, если метрики не посчитались)
combined_df[numeric_cols] = combined_df[numeric_cols].fillna(0)
# --- Обработка batch_id ---
if 'batch_id' in combined_df.columns:
# Приводим к строке и заполняем NaN
combined_df['batch_id'] = combined_df['batch_id'].astype(str).fillna('unknown_batch')
else:
# Если колонки нет, создаем ее
print("Предупреждение: Колонка 'batch_id' отсутствует в загруженных данных. Добавлена со значением 'unknown_batch'.")
combined_df['batch_id'] = 'unknown_batch'
# --------------------------
# Переименовываем столбцы в русские названия ДО возврата
# Отбираем только те колонки, для которых есть перевод
columns_to_rename = {eng: rus for eng, rus in COLUMN_NAME_MAPPING.items() if eng in combined_df.columns}
combined_df = combined_df.rename(columns=columns_to_rename)
print(f"Столбцы переименованы. Новые колонки: {combined_df.columns.tolist()}")
return combined_df
def calculate_aggregated_metrics(df: pd.DataFrame) -> pd.DataFrame:
"""
Вычисляет агрегированные метрики (Weighted, Macro, Micro)
для каждой уникальной комбинации параметров запуска.
Ожидает DataFrame с русскими названиями колонок.
"""
if df.empty:
return pd.DataFrame()
# Определяем параметры, по которым будем группировать (ИСПОЛЬЗУЕМ РУССКИЕ НАЗВАНИЯ)
grouping_params_rus = [
COLUMN_NAME_MAPPING.get('model_name', 'Модель'),
COLUMN_NAME_MAPPING.get('chunking_strategy', 'Стратегия Чанкинга'),
COLUMN_NAME_MAPPING.get('strategy_params', 'Параметры Стратегии'),
COLUMN_NAME_MAPPING.get('process_tables', 'Обраб. Таблиц'),
COLUMN_NAME_MAPPING.get('top_n', 'Top N'),
COLUMN_NAME_MAPPING.get('use_injection', 'Сборка Контекста'),
COLUMN_NAME_MAPPING.get('use_qe', 'Query Expansion'),
COLUMN_NAME_MAPPING.get('neighbors_included', 'Вкл. Соседей'),
COLUMN_NAME_MAPPING.get('similarity_threshold', 'Порог Схожести')
]
# Проверяем наличие всех колонок для группировки (с русскими именами)
missing_cols = [col for col in grouping_params_rus if col not in df.columns]
if missing_cols:
print(f"Ошибка: Отсутствуют необходимые колонки для группировки (русские): {missing_cols}")
# Удаляем отсутствующие колонки из списка группировки
grouping_params_rus = [col for col in grouping_params_rus if col not in missing_cols]
if not grouping_params_rus:
print("Невозможно выполнить группировку.")
return pd.DataFrame()
print(f"Группировка по параметрам (русские): {grouping_params_rus}")
# Используем grouping_params_rus для группировки
grouped = df.groupby(grouping_params_rus)
aggregated_results = []
# Итерируемся по каждой группе (комбинации параметров)
for params, group_df in tqdm(grouped, desc="Расчет агрегированных метрик"):
# Начинаем со словаря параметров (уже с русскими именами)
agg_result = dict(zip(grouping_params_rus, params))
# --- Метрики для усреднения/взвешивания (РУССКИЕ НАЗВАНИЯ) ---
chunk_prec_col = COLUMN_NAME_MAPPING.get('chunk_text_precision', 'Точность (Чанк-Текст)')
chunk_rec_col = COLUMN_NAME_MAPPING.get('chunk_text_recall', 'Полнота (Чанк-Текст)')
chunk_f1_col = COLUMN_NAME_MAPPING.get('chunk_text_f1', 'F1 (Чанк-Текст)')
assembly_rec_col = COLUMN_NAME_MAPPING.get('assembly_punct_recall', 'Полнота (Сборка-Пункт)')
total_chunks_col = COLUMN_NAME_MAPPING.get('total_chunks_in_top_n', 'Всего Чанков в Топ-N')
total_puncts_col = COLUMN_NAME_MAPPING.get('total_puncts', 'Всего Пунктов')
found_puncts_col = COLUMN_NAME_MAPPING.get('found_puncts', 'Найдено Пунктов') # Для micro
relevant_chunks_col = COLUMN_NAME_MAPPING.get('relevant_chunks', 'Релевантных Чанков') # Для micro
# Колонки, которые должны существовать для расчетов
required_metric_cols = [chunk_prec_col, chunk_rec_col, chunk_f1_col, assembly_rec_col]
required_count_cols = [total_chunks_col, total_puncts_col, found_puncts_col, relevant_chunks_col]
existing_metric_cols = [m for m in required_metric_cols if m in group_df.columns]
existing_count_cols = [c for c in required_count_cols if c in group_df.columns]
# --- Macro метрики (Простое усреднение метрик по вопросам) ---
if existing_metric_cols:
macro_metrics = group_df[existing_metric_cols].mean().rename(
# Генерируем имя 'Macro Имя Метрики'
lambda x: COLUMN_NAME_MAPPING.get(f"macro_{{key}}".format(key=next((k for k, v in COLUMN_NAME_MAPPING.items() if v == x), None)), f"Macro {x}")
).to_dict()
agg_result.update(macro_metrics)
else:
print(f"Предупреждение: Пропуск Macro метрик для группы {params}, нет колонок метрик.")
# --- Weighted метрики (Взвешенное усреднение) ---
weighted_chunk_precision = 0.0
weighted_chunk_recall = 0.0
weighted_assembly_recall = 0.0
weighted_chunk_f1 = 0.0
# Проверяем наличие необходимых колонок для взвешенного расчета
can_calculate_weighted = True
if chunk_prec_col not in existing_metric_cols or total_chunks_col not in existing_count_cols:
print(f"Предупреждение: Пропуск Weighted Точность (Чанк-Текст) для группы {params}, отсутствуют {chunk_prec_col} или {total_chunks_col}.")
can_calculate_weighted = False
if chunk_rec_col not in existing_metric_cols or total_puncts_col not in existing_count_cols:
print(f"Предупреждение: Пропуск Weighted Полнота (Чанк-Текст) для группы {params}, отсутствуют {chunk_rec_col} или {total_puncts_col}.")
can_calculate_weighted = False
if assembly_rec_col not in existing_metric_cols or total_puncts_col not in existing_count_cols:
print(f"Предупреждение: Пропуск Weighted Полнота (Сборка-Пункт) для группы {params}, отсутствуют {assembly_rec_col} или {total_puncts_col}.")
# Не сбрасываем can_calculate_weighted, т.к. другие weighted могут посчитаться
if can_calculate_weighted:
total_chunks_sum = group_df[total_chunks_col].sum()
total_puncts_sum = group_df[total_puncts_col].sum()
# Weighted Precision (Chunk-Text)
if total_chunks_sum > 0 and chunk_prec_col in existing_metric_cols:
weighted_chunk_precision = (group_df[chunk_prec_col] * group_df[total_chunks_col]).sum() / total_chunks_sum
# Weighted Recall (Chunk-Text)
if total_puncts_sum > 0 and chunk_rec_col in existing_metric_cols:
weighted_chunk_recall = (group_df[chunk_rec_col] * group_df[total_puncts_col]).sum() / total_puncts_sum
# Weighted Recall (Assembly-Punct)
if total_puncts_sum > 0 and assembly_rec_col in existing_metric_cols:
weighted_assembly_recall = (group_df[assembly_rec_col] * group_df[total_puncts_col]).sum() / total_puncts_sum
# Weighted F1 (Chunk-Text) - на основе weighted precision и recall
if weighted_chunk_precision + weighted_chunk_recall > 0:
weighted_chunk_f1 = (2 * weighted_chunk_precision * weighted_chunk_recall) / (weighted_chunk_precision + weighted_chunk_recall)
# Добавляем рассчитанные Weighted метрики в результат
agg_result[COLUMN_NAME_MAPPING.get('weighted_chunk_text_precision', 'Weighted Точность (Чанк-Текст)')] = weighted_chunk_precision
agg_result[COLUMN_NAME_MAPPING.get('weighted_chunk_text_recall', 'Weighted Полнота (Чанк-Текст)')] = weighted_chunk_recall
agg_result[COLUMN_NAME_MAPPING.get('weighted_chunk_text_f1', 'Weighted F1 (Чанк-Текст)')] = weighted_chunk_f1
agg_result[COLUMN_NAME_MAPPING.get('weighted_assembly_punct_recall', 'Weighted Полнота (Сборка-Пункт)')] = weighted_assembly_recall
# --- Micro метрики (На основе общих TP, FP, FN, ИСПОЛЬЗУЕМ РУССКИЕ НАЗВАНИЯ) ---
# Колонки уже определены выше
if not all(col in group_df.columns for col in [found_puncts_col, total_puncts_col, relevant_chunks_col, total_chunks_col]):
print(f"Предупреждение: Пропуск расчета micro-метрик для группы {params}, т.к. отсутствуют необходимые колонки.")
agg_result[COLUMN_NAME_MAPPING.get('micro_text_precision', 'Micro Точность (Текст)')] = 0.0
agg_result[COLUMN_NAME_MAPPING.get('micro_text_recall', 'Micro Полнота (Текст)')] = 0.0
agg_result[COLUMN_NAME_MAPPING.get('micro_text_f1', 'Micro F1 (Текст)')] = 0.0
# Добавляем результат группы в общий список
aggregated_results.append(agg_result)
# Создаем итоговый DataFrame (уже с русскими именами)
final_df = pd.DataFrame(aggregated_results)
print(f"Рассчитаны агрегированные метрики для {len(final_df)} комбинаций параметров.")
# Возвращаем DataFrame с русскими названиями колонок
return final_df
# --- Функции для форматирования Excel (адаптированы из combine_results.py) ---
def apply_excel_formatting(workbook: Workbook):
"""Применяет форматирование ко всем листам книги Excel."""
header_font = Font(bold=True)
header_fill = PatternFill(start_color="D9D9D9", end_color="D9D9D9", fill_type="solid")
center_alignment = Alignment(horizontal='center', vertical='center')
wrap_alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
thin_border = Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
)
thick_top_border = Border(top=Side(style='thick'))
for sheet_name in workbook.sheetnames:
sheet = workbook[sheet_name]
if sheet.max_row <= 1: # Пропускаем пустые листы
continue
# Форматирование заголовков
for cell in sheet[1]:
cell.font = header_font
cell.fill = header_fill
cell.alignment = wrap_alignment
cell.border = thin_border
# Автоподбор ширины и форматирование ячеек
for col_idx, column_cells in enumerate(sheet.columns, 1):
max_length = 0
column_letter = get_column_letter(col_idx)
is_numeric_metric_col = False
header_value = sheet.cell(row=1, column=col_idx).value
# Проверяем, является ли колонка числовой метрикой
if isinstance(header_value, str) and any(m in header_value for m in ['precision', 'recall', 'f1', 'relevance']):
is_numeric_metric_col = True
for i, cell in enumerate(column_cells):
# Применяем границы ко всем ячейкам
cell.border = thin_border
# Центрируем все, кроме заголовка
if i > 0:
cell.alignment = center_alignment
# Формат для числовых метрик
if is_numeric_metric_col and i > 0 and isinstance(cell.value, (int, float)):
cell.number_format = '0.0000'
# Расчет ширины
try:
cell_len = len(str(cell.value))
if cell_len > max_length:
max_length = cell_len
except:
pass
adjusted_width = (max_length + 2) * 1.1
sheet.column_dimensions[column_letter].width = min(adjusted_width, 60) # Ограничиваем макс ширину
# Автофильтр
sheet.auto_filter.ref = sheet.dimensions
# Группировка строк (опционально, можно добавить логику из combine_results, если нужна)
# ... (здесь можно вставить apply_group_formatting, если требуется) ...
print("Форматирование Excel завершено.")
def save_to_excel(data_dict: dict[str, pd.DataFrame], output_path: str):
"""Сохраняет несколько DataFrame в один Excel файл с форматированием."""
print(f"Сохранение результатов в Excel: {output_path}")
try:
workbook = Workbook()
workbook.remove(workbook.active) # Удаляем лист по умолчанию
for sheet_name, df in data_dict.items():
if df is not None and not df.empty:
sheet = workbook.create_sheet(title=sheet_name)
for r in dataframe_to_rows(df, index=False, header=True):
# Проверяем и заменяем недопустимые символы в ячейках
cleaned_row = []
for cell_value in r:
if isinstance(cell_value, str):
# Удаляем управляющие символы, кроме стандартных пробельных
cleaned_value = ''.join(c for c in cell_value if c.isprintable() or c in ' \t\n\r')
cleaned_row.append(cleaned_value)
else:
cleaned_row.append(cell_value)
sheet.append(cleaned_row)
print(f" Лист '{sheet_name}' добавлен ({len(df)} строк)")
else:
print(f" Лист '{sheet_name}' пропущен (нет данных)")
# Применяем форматирование ко всей книге
if workbook.sheetnames: # Проверяем, что есть хотя бы один лист
apply_excel_formatting(workbook)
workbook.save(output_path)
print("Excel файл успешно сохранен.")
else:
print("Нет данных для сохранения в Excel.")
except Exception as e:
print(f"Ошибка при сохранении Excel файла: {e}")
# --- Основная функция ---
def main():
"""Основная функция скрипта."""
args = parse_args()
# 1. Загрузка данных
combined_df_eng = load_intermediate_results(args.intermediate_dir)
if combined_df_eng.empty:
print("Нет данных для агрегации. Завершение.")
return
# --- Фильтрация по последнему batch_id (если флаг установлен) ---
target_df = combined_df_eng # По умолчанию используем все данные
if args.latest_batch_only:
print("Фильтрация по последнему batch_id...")
if 'batch_id' not in combined_df_eng.columns:
print("Предупреждение: Колонка 'batch_id' не найдена. Агрегация будет выполнена по всем данным.")
else:
# Находим последний batch_id (сортируем строки по batch_id)
# Сначала отфильтруем 'unknown_batch'
valid_batches = combined_df_eng[combined_df_eng['batch_id'] != 'unknown_batch']['batch_id'].unique()
if len(valid_batches) > 0:
# Сортируем уникальные валидные ID и берем последний
latest_batch_id = sorted(valid_batches)[-1]
print(f"Используется последний валидный batch_id: {latest_batch_id}")
target_df = combined_df_eng[combined_df_eng['batch_id'] == latest_batch_id].copy()
if target_df.empty:
# Это не должно произойти, если latest_batch_id валидный, но на всякий случай
print(f"Предупреждение: Не найдено данных для batch_id {latest_batch_id}. Агрегация будет выполнена по всем данным.")
target_df = combined_df_eng
else:
print(f"Оставлено строк после фильтрации: {len(target_df)}")
else:
print("Предупреждение: Не найдено валидных batch_id для фильтрации. Агрегация будет выполнена по всем данным.")
# target_df уже равен combined_df_eng, так что ничего не делаем
# latest_batch_id = combined_df_eng['batch_id'].astype(str).sort_values().iloc[-1]
# print(f"Используется последний batch_id: {latest_batch_id}")
# target_df = combined_df_eng[combined_df_eng['batch_id'] == latest_batch_id].copy()
# if target_df.empty:
# print(f"Предупреждение: Нет данных для batch_id {latest_batch_id}. Агрегация будет выполнена по всем данным.")
# target_df = combined_df_eng # Возвращаемся ко всем данным, если фильтр дал пустоту
# else:
# print(f"Оставлено строк после фильтрации: {len(target_df)}")
# --- Заполнение NaN и переименование ПОСЛЕ возможной фильтрации ---
# Определяем числовые колонки еще раз (используя английские названия из маппинга)
numeric_cols_eng = [eng for eng, rus in COLUMN_NAME_MAPPING.items() \
if 'recall' in eng or 'precision' in eng or 'f1' in eng or 'puncts' in eng \
or 'chunks' in eng or 'threshold' in eng or 'top_n' in eng]
numeric_cols_in_df = [col for col in numeric_cols_eng if col in target_df.columns]
target_df[numeric_cols_in_df] = target_df[numeric_cols_in_df].fillna(0)
# Переименовываем
columns_to_rename_detailed = {eng: rus for eng, rus in COLUMN_NAME_MAPPING.items() if eng in target_df.columns}
target_df_rus = target_df.rename(columns=columns_to_rename_detailed)
# 2. Расчет агрегированных метрик
# Передаем DataFrame с русскими названиями колонок, calculate_aggregated_metrics теперь их ожидает
aggregated_df_rus = calculate_aggregated_metrics(target_df_rus)
# Переименовываем столбцы агрегированного DF уже внутри calculate_aggregated_metrics
# aggregated_df_rus = pd.DataFrame() # Инициализируем на случай, если aggregated_df_eng пуст
# if not aggregated_df_eng.empty:
# columns_to_rename_agg = {eng: rus for eng, rus in COLUMN_NAME_MAPPING.items() if eng in aggregated_df_eng.columns}
# aggregated_df_rus = aggregated_df_eng.rename(columns=columns_to_rename_agg)
# 3. Подготовка данных для сохранения (с русскими названиями)
data_to_save = {
"Детальные результаты": target_df_rus, # Используем переименованный DF
"Агрегированные метрики": aggregated_df_rus, # Используем переименованный DF
}
# 4. Сохранение в Excel
os.makedirs(args.output_dir, exist_ok=True)
output_file_path = os.path.join(args.output_dir, args.output_filename)
save_to_excel(data_to_save, output_file_path)
if __name__ == "__main__":
main()