#!/usr/bin/env python # -*- coding: utf-8 -*- """ Скрипт для агрегации и анализа результатов множества запусков pipeline.py. Читает все CSV-файлы из директории промежуточных результатов, объединяет их и вычисляет агрегированные метрики: - Weighted (усредненные по всем вопросам, взвешенные по количеству пунктов/чанков/документов) - Macro (усредненные по вопросам - сначала считаем метрику для каждого вопроса, потом усредняем) - Micro (считаем общие TP, FP, FN по всем вопросам, потом вычисляем метрики) Результаты сохраняются в один Excel-файл с несколькими листами. """ import argparse import glob # Импорт для обработки JSON строк import os import pandas as pd from openpyxl import Workbook from openpyxl.styles import Alignment, Border, Font, PatternFill, Side from openpyxl.utils import get_column_letter from openpyxl.utils.dataframe import dataframe_to_rows # Прогресс-бар from tqdm import tqdm # --- Настройки --- DEFAULT_INTERMEDIATE_DIR = "data/intermediate" # Откуда читать CSV DEFAULT_OUTPUT_DIR = "data/output" # Куда сохранять итоговый Excel DEFAULT_OUTPUT_FILENAME = "aggregated_results.xlsx" # --- Маппинг названий столбцов на русский язык --- COLUMN_NAME_MAPPING = { # Параметры запуска из pipeline.py 'run_id': 'ID Запуска', 'model_name': 'Модель', 'chunking_strategy': 'Стратегия Чанкинга', 'strategy_params': 'Параметры Стратегии', 'process_tables': 'Обраб. Таблиц', 'top_n': 'Top N', 'use_injection': 'Сборка Контекста', 'use_qe': 'Query Expansion', 'neighbors_included': 'Вкл. Соседей', 'similarity_threshold': 'Порог Схожести', # Идентификаторы из датасета (для детальных результатов) 'question_id': 'ID Вопроса', 'question_text': 'Текст Вопроса', # Детальные метрики из pipeline.py 'chunk_text_precision': 'Точность (Чанк-Текст)', 'chunk_text_recall': 'Полнота (Чанк-Текст)', 'chunk_text_f1': 'F1 (Чанк-Текст)', 'found_puncts': 'Найдено Пунктов', 'total_puncts': 'Всего Пунктов', 'relevant_chunks': 'Релевантных Чанков', 'total_chunks_in_top_n': 'Всего Чанков в Топ-N', 'assembly_punct_recall': 'Полнота (Сборка-Пункт)', 'assembled_context_preview': 'Предпросмотр Сборки', # 'top_chunk_ids': 'Индексы Топ-Чанков', # Списки, могут плохо отображаться # 'top_chunk_similarities': 'Схожести Топ-Чанков', # Списки # Агрегированные метрики (добавляются в calculate_aggregated_metrics) 'weighted_chunk_text_precision': 'Weighted Точность (Чанк-Текст)', 'weighted_chunk_text_recall': 'Weighted Полнота (Чанк-Текст)', 'weighted_chunk_text_f1': 'Weighted F1 (Чанк-Текст)', 'weighted_assembly_punct_recall': 'Weighted Полнота (Сборка-Пункт)', 'macro_chunk_text_precision': 'Macro Точность (Чанк-Текст)', 'macro_chunk_text_recall': 'Macro Полнота (Чанк-Текст)', 'macro_chunk_text_f1': 'Macro F1 (Чанк-Текст)', 'macro_assembly_punct_recall': 'Macro Полнота (Сборка-Пункт)', 'micro_text_precision': 'Micro Точность (Текст)', 'micro_text_recall': 'Micro Полнота (Текст)', 'micro_text_f1': 'Micro F1 (Текст)', } def parse_args(): """Парсит аргументы командной строки.""" parser = argparse.ArgumentParser(description="Агрегация результатов оценочных пайплайнов") parser.add_argument("--intermediate-dir", type=str, default=DEFAULT_INTERMEDIATE_DIR, help=f"Директория с промежуточными CSV результатами (по умолчанию: {DEFAULT_INTERMEDIATE_DIR})") parser.add_argument("--output-dir", type=str, default=DEFAULT_OUTPUT_DIR, help=f"Директория для сохранения итогового Excel файла (по умолчанию: {DEFAULT_OUTPUT_DIR})") parser.add_argument("--output-filename", type=str, default=DEFAULT_OUTPUT_FILENAME, help=f"Имя выходного Excel файла (по умолчанию: {DEFAULT_OUTPUT_FILENAME})") parser.add_argument("--latest-batch-only", action="store_true", help="Агрегировать результаты только для последнего batch_id") return parser.parse_args() def load_intermediate_results(intermediate_dir: str) -> pd.DataFrame: """Загружает все CSV файлы из указанной директории.""" print(f"Загрузка промежуточных результатов из: {intermediate_dir}") csv_files = glob.glob(os.path.join(intermediate_dir, "results_*.csv")) if not csv_files: print(f"ВНИМАНИЕ: В директории {intermediate_dir} не найдено файлов 'results_*.csv'.") return pd.DataFrame() all_data = [] for f in csv_files: try: df = pd.read_csv(f) all_data.append(df) print(f" Загружен файл: {os.path.basename(f)} ({len(df)} строк)") except Exception as e: print(f"Ошибка при чтении файла {f}: {e}") if not all_data: print("Не удалось загрузить ни одного файла с результатами.") return pd.DataFrame() combined_df = pd.concat(all_data, ignore_index=True) print(f"Всего загружено строк: {len(combined_df)}") print(f"Найденные колонки: {combined_df.columns.tolist()}") # Преобразуем типы данных для надежности numeric_cols = [ 'chunk_text_precision', 'chunk_text_recall', 'chunk_text_f1', 'found_puncts', 'total_puncts', 'relevant_chunks', 'total_chunks_in_top_n', 'assembly_punct_recall', 'similarity_threshold', 'top_n', ] for col in numeric_cols: if col in combined_df.columns: combined_df[col] = pd.to_numeric(combined_df[col], errors='coerce') boolean_cols = [ 'use_injection', 'process_tables', 'use_qe', 'neighbors_included' ] for col in boolean_cols: if col in combined_df.columns: # Пытаемся конвертировать в bool, обрабатывая строки 'True'/'False' if combined_df[col].dtype == 'object': combined_df[col] = combined_df[col].astype(str).str.lower().map({'true': True, 'false': False}).fillna(False) combined_df[col] = combined_df[col].astype(bool) # Заполним пропуски в числовых колонках нулями (например, если метрики не посчитались) combined_df[numeric_cols] = combined_df[numeric_cols].fillna(0) # --- Обработка batch_id --- if 'batch_id' in combined_df.columns: # Приводим к строке и заполняем NaN combined_df['batch_id'] = combined_df['batch_id'].astype(str).fillna('unknown_batch') else: # Если колонки нет, создаем ее print("Предупреждение: Колонка 'batch_id' отсутствует в загруженных данных. Добавлена со значением 'unknown_batch'.") combined_df['batch_id'] = 'unknown_batch' # -------------------------- # Переименовываем столбцы в русские названия ДО возврата # Отбираем только те колонки, для которых есть перевод columns_to_rename = {eng: rus for eng, rus in COLUMN_NAME_MAPPING.items() if eng in combined_df.columns} combined_df = combined_df.rename(columns=columns_to_rename) print(f"Столбцы переименованы. Новые колонки: {combined_df.columns.tolist()}") return combined_df def calculate_aggregated_metrics(df: pd.DataFrame) -> pd.DataFrame: """ Вычисляет агрегированные метрики (Weighted, Macro, Micro) для каждой уникальной комбинации параметров запуска. Ожидает DataFrame с русскими названиями колонок. """ if df.empty: return pd.DataFrame() # Определяем параметры, по которым будем группировать (ИСПОЛЬЗУЕМ РУССКИЕ НАЗВАНИЯ) grouping_params_rus = [ COLUMN_NAME_MAPPING.get('model_name', 'Модель'), COLUMN_NAME_MAPPING.get('chunking_strategy', 'Стратегия Чанкинга'), COLUMN_NAME_MAPPING.get('strategy_params', 'Параметры Стратегии'), COLUMN_NAME_MAPPING.get('process_tables', 'Обраб. Таблиц'), COLUMN_NAME_MAPPING.get('top_n', 'Top N'), COLUMN_NAME_MAPPING.get('use_injection', 'Сборка Контекста'), COLUMN_NAME_MAPPING.get('use_qe', 'Query Expansion'), COLUMN_NAME_MAPPING.get('neighbors_included', 'Вкл. Соседей'), COLUMN_NAME_MAPPING.get('similarity_threshold', 'Порог Схожести') ] # Проверяем наличие всех колонок для группировки (с русскими именами) missing_cols = [col for col in grouping_params_rus if col not in df.columns] if missing_cols: print(f"Ошибка: Отсутствуют необходимые колонки для группировки (русские): {missing_cols}") # Удаляем отсутствующие колонки из списка группировки grouping_params_rus = [col for col in grouping_params_rus if col not in missing_cols] if not grouping_params_rus: print("Невозможно выполнить группировку.") return pd.DataFrame() print(f"Группировка по параметрам (русские): {grouping_params_rus}") # Используем grouping_params_rus для группировки grouped = df.groupby(grouping_params_rus) aggregated_results = [] # Итерируемся по каждой группе (комбинации параметров) for params, group_df in tqdm(grouped, desc="Расчет агрегированных метрик"): # Начинаем со словаря параметров (уже с русскими именами) agg_result = dict(zip(grouping_params_rus, params)) # --- Метрики для усреднения/взвешивания (РУССКИЕ НАЗВАНИЯ) --- chunk_prec_col = COLUMN_NAME_MAPPING.get('chunk_text_precision', 'Точность (Чанк-Текст)') chunk_rec_col = COLUMN_NAME_MAPPING.get('chunk_text_recall', 'Полнота (Чанк-Текст)') chunk_f1_col = COLUMN_NAME_MAPPING.get('chunk_text_f1', 'F1 (Чанк-Текст)') assembly_rec_col = COLUMN_NAME_MAPPING.get('assembly_punct_recall', 'Полнота (Сборка-Пункт)') total_chunks_col = COLUMN_NAME_MAPPING.get('total_chunks_in_top_n', 'Всего Чанков в Топ-N') total_puncts_col = COLUMN_NAME_MAPPING.get('total_puncts', 'Всего Пунктов') found_puncts_col = COLUMN_NAME_MAPPING.get('found_puncts', 'Найдено Пунктов') # Для micro relevant_chunks_col = COLUMN_NAME_MAPPING.get('relevant_chunks', 'Релевантных Чанков') # Для micro # Колонки, которые должны существовать для расчетов required_metric_cols = [chunk_prec_col, chunk_rec_col, chunk_f1_col, assembly_rec_col] required_count_cols = [total_chunks_col, total_puncts_col, found_puncts_col, relevant_chunks_col] existing_metric_cols = [m for m in required_metric_cols if m in group_df.columns] existing_count_cols = [c for c in required_count_cols if c in group_df.columns] # --- Macro метрики (Простое усреднение метрик по вопросам) --- if existing_metric_cols: macro_metrics = group_df[existing_metric_cols].mean().rename( # Генерируем имя 'Macro Имя Метрики' lambda x: COLUMN_NAME_MAPPING.get(f"macro_{{key}}".format(key=next((k for k, v in COLUMN_NAME_MAPPING.items() if v == x), None)), f"Macro {x}") ).to_dict() agg_result.update(macro_metrics) else: print(f"Предупреждение: Пропуск Macro метрик для группы {params}, нет колонок метрик.") # --- Weighted метрики (Взвешенное усреднение) --- weighted_chunk_precision = 0.0 weighted_chunk_recall = 0.0 weighted_assembly_recall = 0.0 weighted_chunk_f1 = 0.0 # Проверяем наличие необходимых колонок для взвешенного расчета can_calculate_weighted = True if chunk_prec_col not in existing_metric_cols or total_chunks_col not in existing_count_cols: print(f"Предупреждение: Пропуск Weighted Точность (Чанк-Текст) для группы {params}, отсутствуют {chunk_prec_col} или {total_chunks_col}.") can_calculate_weighted = False if chunk_rec_col not in existing_metric_cols or total_puncts_col not in existing_count_cols: print(f"Предупреждение: Пропуск Weighted Полнота (Чанк-Текст) для группы {params}, отсутствуют {chunk_rec_col} или {total_puncts_col}.") can_calculate_weighted = False if assembly_rec_col not in existing_metric_cols or total_puncts_col not in existing_count_cols: print(f"Предупреждение: Пропуск Weighted Полнота (Сборка-Пункт) для группы {params}, отсутствуют {assembly_rec_col} или {total_puncts_col}.") # Не сбрасываем can_calculate_weighted, т.к. другие weighted могут посчитаться if can_calculate_weighted: total_chunks_sum = group_df[total_chunks_col].sum() total_puncts_sum = group_df[total_puncts_col].sum() # Weighted Precision (Chunk-Text) if total_chunks_sum > 0 and chunk_prec_col in existing_metric_cols: weighted_chunk_precision = (group_df[chunk_prec_col] * group_df[total_chunks_col]).sum() / total_chunks_sum # Weighted Recall (Chunk-Text) if total_puncts_sum > 0 and chunk_rec_col in existing_metric_cols: weighted_chunk_recall = (group_df[chunk_rec_col] * group_df[total_puncts_col]).sum() / total_puncts_sum # Weighted Recall (Assembly-Punct) if total_puncts_sum > 0 and assembly_rec_col in existing_metric_cols: weighted_assembly_recall = (group_df[assembly_rec_col] * group_df[total_puncts_col]).sum() / total_puncts_sum # Weighted F1 (Chunk-Text) - на основе weighted precision и recall if weighted_chunk_precision + weighted_chunk_recall > 0: weighted_chunk_f1 = (2 * weighted_chunk_precision * weighted_chunk_recall) / (weighted_chunk_precision + weighted_chunk_recall) # Добавляем рассчитанные Weighted метрики в результат agg_result[COLUMN_NAME_MAPPING.get('weighted_chunk_text_precision', 'Weighted Точность (Чанк-Текст)')] = weighted_chunk_precision agg_result[COLUMN_NAME_MAPPING.get('weighted_chunk_text_recall', 'Weighted Полнота (Чанк-Текст)')] = weighted_chunk_recall agg_result[COLUMN_NAME_MAPPING.get('weighted_chunk_text_f1', 'Weighted F1 (Чанк-Текст)')] = weighted_chunk_f1 agg_result[COLUMN_NAME_MAPPING.get('weighted_assembly_punct_recall', 'Weighted Полнота (Сборка-Пункт)')] = weighted_assembly_recall # --- Micro метрики (На основе общих TP, FP, FN, ИСПОЛЬЗУЕМ РУССКИЕ НАЗВАНИЯ) --- # Колонки уже определены выше if not all(col in group_df.columns for col in [found_puncts_col, total_puncts_col, relevant_chunks_col, total_chunks_col]): print(f"Предупреждение: Пропуск расчета micro-метрик для группы {params}, т.к. отсутствуют необходимые колонки.") agg_result[COLUMN_NAME_MAPPING.get('micro_text_precision', 'Micro Точность (Текст)')] = 0.0 agg_result[COLUMN_NAME_MAPPING.get('micro_text_recall', 'Micro Полнота (Текст)')] = 0.0 agg_result[COLUMN_NAME_MAPPING.get('micro_text_f1', 'Micro F1 (Текст)')] = 0.0 # Добавляем результат группы в общий список aggregated_results.append(agg_result) # Создаем итоговый DataFrame (уже с русскими именами) final_df = pd.DataFrame(aggregated_results) print(f"Рассчитаны агрегированные метрики для {len(final_df)} комбинаций параметров.") # Возвращаем DataFrame с русскими названиями колонок return final_df # --- Функции для форматирования Excel (адаптированы из combine_results.py) --- def apply_excel_formatting(workbook: Workbook): """Применяет форматирование ко всем листам книги Excel.""" header_font = Font(bold=True) header_fill = PatternFill(start_color="D9D9D9", end_color="D9D9D9", fill_type="solid") center_alignment = Alignment(horizontal='center', vertical='center') wrap_alignment = Alignment(horizontal='center', vertical='center', wrap_text=True) thin_border = Border( left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'), bottom=Side(style='thin') ) thick_top_border = Border(top=Side(style='thick')) for sheet_name in workbook.sheetnames: sheet = workbook[sheet_name] if sheet.max_row <= 1: # Пропускаем пустые листы continue # Форматирование заголовков for cell in sheet[1]: cell.font = header_font cell.fill = header_fill cell.alignment = wrap_alignment cell.border = thin_border # Автоподбор ширины и форматирование ячеек for col_idx, column_cells in enumerate(sheet.columns, 1): max_length = 0 column_letter = get_column_letter(col_idx) is_numeric_metric_col = False header_value = sheet.cell(row=1, column=col_idx).value # Проверяем, является ли колонка числовой метрикой if isinstance(header_value, str) and any(m in header_value for m in ['precision', 'recall', 'f1', 'relevance']): is_numeric_metric_col = True for i, cell in enumerate(column_cells): # Применяем границы ко всем ячейкам cell.border = thin_border # Центрируем все, кроме заголовка if i > 0: cell.alignment = center_alignment # Формат для числовых метрик if is_numeric_metric_col and i > 0 and isinstance(cell.value, (int, float)): cell.number_format = '0.0000' # Расчет ширины try: cell_len = len(str(cell.value)) if cell_len > max_length: max_length = cell_len except: pass adjusted_width = (max_length + 2) * 1.1 sheet.column_dimensions[column_letter].width = min(adjusted_width, 60) # Ограничиваем макс ширину # Автофильтр sheet.auto_filter.ref = sheet.dimensions # Группировка строк (опционально, можно добавить логику из combine_results, если нужна) # ... (здесь можно вставить apply_group_formatting, если требуется) ... print("Форматирование Excel завершено.") def save_to_excel(data_dict: dict[str, pd.DataFrame], output_path: str): """Сохраняет несколько DataFrame в один Excel файл с форматированием.""" print(f"Сохранение результатов в Excel: {output_path}") try: workbook = Workbook() workbook.remove(workbook.active) # Удаляем лист по умолчанию for sheet_name, df in data_dict.items(): if df is not None and not df.empty: sheet = workbook.create_sheet(title=sheet_name) for r in dataframe_to_rows(df, index=False, header=True): # Проверяем и заменяем недопустимые символы в ячейках cleaned_row = [] for cell_value in r: if isinstance(cell_value, str): # Удаляем управляющие символы, кроме стандартных пробельных cleaned_value = ''.join(c for c in cell_value if c.isprintable() or c in ' \t\n\r') cleaned_row.append(cleaned_value) else: cleaned_row.append(cell_value) sheet.append(cleaned_row) print(f" Лист '{sheet_name}' добавлен ({len(df)} строк)") else: print(f" Лист '{sheet_name}' пропущен (нет данных)") # Применяем форматирование ко всей книге if workbook.sheetnames: # Проверяем, что есть хотя бы один лист apply_excel_formatting(workbook) workbook.save(output_path) print("Excel файл успешно сохранен.") else: print("Нет данных для сохранения в Excel.") except Exception as e: print(f"Ошибка при сохранении Excel файла: {e}") # --- Основная функция --- def main(): """Основная функция скрипта.""" args = parse_args() # 1. Загрузка данных combined_df_eng = load_intermediate_results(args.intermediate_dir) if combined_df_eng.empty: print("Нет данных для агрегации. Завершение.") return # --- Фильтрация по последнему batch_id (если флаг установлен) --- target_df = combined_df_eng # По умолчанию используем все данные if args.latest_batch_only: print("Фильтрация по последнему batch_id...") if 'batch_id' not in combined_df_eng.columns: print("Предупреждение: Колонка 'batch_id' не найдена. Агрегация будет выполнена по всем данным.") else: # Находим последний batch_id (сортируем строки по batch_id) # Сначала отфильтруем 'unknown_batch' valid_batches = combined_df_eng[combined_df_eng['batch_id'] != 'unknown_batch']['batch_id'].unique() if len(valid_batches) > 0: # Сортируем уникальные валидные ID и берем последний latest_batch_id = sorted(valid_batches)[-1] print(f"Используется последний валидный batch_id: {latest_batch_id}") target_df = combined_df_eng[combined_df_eng['batch_id'] == latest_batch_id].copy() if target_df.empty: # Это не должно произойти, если latest_batch_id валидный, но на всякий случай print(f"Предупреждение: Не найдено данных для batch_id {latest_batch_id}. Агрегация будет выполнена по всем данным.") target_df = combined_df_eng else: print(f"Оставлено строк после фильтрации: {len(target_df)}") else: print("Предупреждение: Не найдено валидных batch_id для фильтрации. Агрегация будет выполнена по всем данным.") # target_df уже равен combined_df_eng, так что ничего не делаем # latest_batch_id = combined_df_eng['batch_id'].astype(str).sort_values().iloc[-1] # print(f"Используется последний batch_id: {latest_batch_id}") # target_df = combined_df_eng[combined_df_eng['batch_id'] == latest_batch_id].copy() # if target_df.empty: # print(f"Предупреждение: Нет данных для batch_id {latest_batch_id}. Агрегация будет выполнена по всем данным.") # target_df = combined_df_eng # Возвращаемся ко всем данным, если фильтр дал пустоту # else: # print(f"Оставлено строк после фильтрации: {len(target_df)}") # --- Заполнение NaN и переименование ПОСЛЕ возможной фильтрации --- # Определяем числовые колонки еще раз (используя английские названия из маппинга) numeric_cols_eng = [eng for eng, rus in COLUMN_NAME_MAPPING.items() \ if 'recall' in eng or 'precision' in eng or 'f1' in eng or 'puncts' in eng \ or 'chunks' in eng or 'threshold' in eng or 'top_n' in eng] numeric_cols_in_df = [col for col in numeric_cols_eng if col in target_df.columns] target_df[numeric_cols_in_df] = target_df[numeric_cols_in_df].fillna(0) # Переименовываем columns_to_rename_detailed = {eng: rus for eng, rus in COLUMN_NAME_MAPPING.items() if eng in target_df.columns} target_df_rus = target_df.rename(columns=columns_to_rename_detailed) # 2. Расчет агрегированных метрик # Передаем DataFrame с русскими названиями колонок, calculate_aggregated_metrics теперь их ожидает aggregated_df_rus = calculate_aggregated_metrics(target_df_rus) # Переименовываем столбцы агрегированного DF уже внутри calculate_aggregated_metrics # aggregated_df_rus = pd.DataFrame() # Инициализируем на случай, если aggregated_df_eng пуст # if not aggregated_df_eng.empty: # columns_to_rename_agg = {eng: rus for eng, rus in COLUMN_NAME_MAPPING.items() if eng in aggregated_df_eng.columns} # aggregated_df_rus = aggregated_df_eng.rename(columns=columns_to_rename_agg) # 3. Подготовка данных для сохранения (с русскими названиями) data_to_save = { "Детальные результаты": target_df_rus, # Используем переименованный DF "Агрегированные метрики": aggregated_df_rus, # Используем переименованный DF } # 4. Сохранение в Excel os.makedirs(args.output_dir, exist_ok=True) output_file_path = os.path.join(args.output_dir, args.output_filename) save_to_excel(data_to_save, output_file_path) if __name__ == "__main__": main()