#!/usr/bin/env python # -*- coding: utf-8 -*- """ Скрипт для запуска множества пайплайнов оценки (`pipeline.py`) с различными комбинациями параметров. Собирает команды для `pipeline.py` и запускает их последовательно, логируя вывод каждого запуска. """ import argparse import json import os import pathlib import subprocess import sys import time from datetime import datetime from itertools import product from uuid import uuid4 # --- Конфигурация Экспериментов --- # Модели для тестирования MODELS_TO_TEST = [ # "intfloat/e5-base", # "intfloat/e5-large", "BAAI/bge-m3", # "deepvk/USER-bge-m3" # "ai-forever/FRIDA" # Требует --use-sentence-transformers ] # Параметры чанкинга (слова / перекрытие) CHUNKING_PARAMS = [ # Пример для стратегии "fixed_size" {"strategy": "fixed_size", "params": {"words_per_chunk": 50, "overlap_words": 25}}, # {"strategy": "fixed_size", "params": {"words_per_chunk": 100, "overlap_words": 25}}, # {"strategy": "fixed_size", "params": {"words_per_chunk": 50, "overlap_words": 0}}, # TODO: Добавить другие стратегии и их параметры, если нужно # {"strategy": "some_other_strategy", "params": {"param1": "value1"}} ] # Значения Top-N для ретривера TOP_N_VALUES = [20, 50, 100] # Использовать ли сборку контекста (InjectionBuilder) USE_INJECTION_OPTIONS = [False, True] # Порог схожести для fuzzy сравнения (чанк/пункт) SIMILARITY_THRESHOLDS = [0.7] # Опции использования Query Expansion USE_QE_OPTIONS = [False, True] # Опции обработки таблиц PROCESS_TABLES_OPTIONS = [True] # Опции включения соседей INCLUDE_NEIGHBORS_OPTIONS = [True] # --- Настройки Скрипта --- DEFAULT_LOG_DIR = "logs" # Директория для логов отдельных запусков pipeline.py DEFAULT_INTERMEDIATE_DIR = "data/intermediate" # Куда pipeline.py сохраняет свои результаты DEFAULT_PYTHON_EXECUTABLE = sys.executable # Использовать тот же python, что и для запуска этого скрипта def parse_args(): """Парсит аргументы командной строки.""" parser = argparse.ArgumentParser(description="Запуск серии оценочных пайплайнов") # Флаги для пропуска определенных измерений parser.add_argument("--skip-models", action="store_true", help="Пропустить итерацию по разным моделям (использовать первую в списке)") parser.add_argument("--skip-chunking", action="store_true", help="Пропустить итерацию по разным параметрам чанкинга (использовать первую в списке)") parser.add_argument("--skip-top-n", action="store_true", help="Пропустить итерацию по разным top_n (использовать первое значение)") parser.add_argument("--skip-injection", action="store_true", help="Пропустить итерацию по опциям сборки контекста (использовать False)") parser.add_argument("--skip-thresholds", action="store_true", help="Пропустить итерацию по порогам схожести (использовать первый)") parser.add_argument("--skip-process-tables", action="store_true", help="Пропустить итерацию по обработке таблиц (использовать True)") parser.add_argument("--skip-include-neighbors", action="store_true", help="Пропустить итерацию по включению соседей (использовать False)") parser.add_argument("--skip-qe", action="store_true", help="Пропустить итерацию по использованию Query Expansion (использовать False)") # Настройки путей и выполнения parser.add_argument("--log-dir", type=str, default=DEFAULT_LOG_DIR, help=f"Директория для сохранения логов запусков (по умолчанию: {DEFAULT_LOG_DIR})") parser.add_argument("--intermediate-dir", type=str, default=DEFAULT_INTERMEDIATE_DIR, help=f"Директория для промежуточных результатов pipeline.py (по умолчанию: {DEFAULT_INTERMEDIATE_DIR})") parser.add_argument("--device", type=str, default="cuda:0", help="Устройство для вычислений в pipeline.py (напр., cpu, cuda:0)") parser.add_argument("--python-executable", type=str, default=DEFAULT_PYTHON_EXECUTABLE, help="Путь к интерпретатору Python для запуска pipeline.py") # Параметры, передаваемые в pipeline.py (если не перебираются) parser.add_argument("--data-folder", type=str, default="data/input/docs", help="Папка с документами для pipeline.py") parser.add_argument("--search-dataset-path", type=str, default="data/input/search_dataset_text.xlsx", help="Поисковый датасет для pipeline.py") parser.add_argument("--qa-dataset-path", type=str, default="data/input/question_answering.xlsx", help="QA датасет для pipeline.py") return parser.parse_args() def run_single_pipeline(cmd: list[str], log_path: str): """ Запускает один экземпляр pipeline.py и логирует его вывод. Args: cmd: Список аргументов команды для subprocess. log_path: Путь к файлу для сохранения лога. Returns: Код возврата процесса. """ print(f"\n--- Запуск: {' '.join(cmd)} ---") print(f"--- Лог: {log_path} --- ") start_time = time.time() return_code = -1 try: with open(log_path, "w", encoding="utf-8") as log_file: log_file.write(f"Команда: {' '.join(cmd)}\n") log_file.write(f"Время запуска: {datetime.now()}\n\n") log_file.flush() # Запускаем процесс process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # Перенаправляем stderr в stdout text=True, encoding='utf-8', # Указываем кодировку errors='replace', # Заменяем ошибки кодирования bufsize=1 # Построчная буферизация ) # Читаем и пишем вывод построчно for line in process.stdout: print(line, end="") # Выводим в консоль log_file.write(line) # Пишем в лог log_file.flush() # Ждем завершения и получаем код возврата process.wait() return_code = process.returncode except Exception as e: print(f"\nОшибка при запуске процесса: {e}") with open(log_path, "a", encoding="utf-8") as log_file: log_file.write(f"\nОшибка при запуске: {e}\n") return_code = 1 # Считаем ошибкой end_time = time.time() duration = end_time - start_time result_message = f"Успешно завершено за {duration:.2f} сек." if return_code != 0: result_message = f"Завершено с ошибкой (код {return_code}) за {duration:.2f} сек." print(f"--- {result_message} ---") with open(log_path, "a", encoding="utf-8") as log_file: log_file.write(f"\nВремя завершения: {datetime.now()}") log_file.write(f"\nДлительность: {duration:.2f} сек.") log_file.write(f"\nКод возврата: {return_code}\n") return return_code def main(): """Основная функция скрипта.""" args = parse_args() # --- Генерируем ID для всей серии запусков --- batch_run_id = f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}" print(f"Запуск серии экспериментов. Batch ID: {batch_run_id}") # Создаем директории для логов и промежуточных результатов os.makedirs(args.log_dir, exist_ok=True) os.makedirs(args.intermediate_dir, exist_ok=True) # Определяем абсолютный путь к pipeline.py RUN_PIPELINES_SCRIPT_PATH = pathlib.Path(__file__).resolve() SCRIPTS_TESTING_DIR = RUN_PIPELINES_SCRIPT_PATH.parent PIPELINE_SCRIPT_PATH = SCRIPTS_TESTING_DIR / "pipeline.py" # --- Определяем параметры для перебора --- models = [MODELS_TO_TEST[0]] if args.skip_models else MODELS_TO_TEST chunking_configs = [CHUNKING_PARAMS[0]] if args.skip_chunking else CHUNKING_PARAMS top_n_list = [TOP_N_VALUES[0]] if args.skip_top_n else TOP_N_VALUES use_injection_list = [False] if args.skip_injection else USE_INJECTION_OPTIONS threshold_list = [SIMILARITY_THRESHOLDS[0]] if args.skip_thresholds else SIMILARITY_THRESHOLDS # Определяем списки для новых измерений process_tables_list = [PROCESS_TABLES_OPTIONS[0]] if args.skip_process_tables else PROCESS_TABLES_OPTIONS include_neighbors_list = [INCLUDE_NEIGHBORS_OPTIONS[0]] if args.skip_include_neighbors else INCLUDE_NEIGHBORS_OPTIONS use_qe_list = [USE_QE_OPTIONS[0]] if args.skip_qe else USE_QE_OPTIONS # --- Создаем список всех комбинаций параметров --- parameter_combinations = list(product( models, chunking_configs, top_n_list, use_injection_list, threshold_list, process_tables_list, include_neighbors_list, use_qe_list )) total_runs = len(parameter_combinations) print(f"Всего запланировано запусков: {total_runs}") # --- Запускаем пайплайны для каждой комбинации --- completed_runs = 0 failed_runs = 0 start_time_all = time.time() for i, (model, chunk_cfg, top_n, use_injection, threshold, process_tables, include_neighbors, use_qe) in enumerate(parameter_combinations): print(f"\n{'='*80}") print(f"Запуск {i+1}/{total_runs}") print(f" Модель: {model}") # Логируем параметры чанкинга strategy = chunk_cfg['strategy'] params = chunk_cfg['params'] params_str = json.dumps(params, ensure_ascii=False) print(f" Чанкинг: Стратегия='{strategy}', Параметры={params_str}") print(f" Обработка таблиц: {process_tables}") print(f" Top-N: {top_n}") print(f" Сборка контекста: {use_injection}") print(f" Query Expansion: {use_qe}") print(f" Включение соседей: {include_neighbors}") print(f" Порог схожести: {threshold}") print(f"{'='*80}") # Генерируем уникальный ID для этого запуска run_id = f"run_{datetime.now().strftime('%Y%m%d%H%M%S')}_{uuid4().hex[:8]}" # Формируем команду для pipeline.py cmd = [ args.python_executable, str(PIPELINE_SCRIPT_PATH), # Используем абсолютный путь "--run-id", run_id, "--batch-id", batch_run_id, "--data-folder", args.data_folder, "--search-dataset-path", args.search_dataset_path, "--output-dir", args.intermediate_dir, "--model-name", model, "--chunking-strategy", strategy, "--strategy-params", params_str, "--top-n", str(top_n), "--similarity-threshold", str(threshold), "--device", args.device, ] # Добавляем флаг --use-injection, если нужно if use_injection: cmd.append("--use-injection") # Добавляем флаг --no-process-tables, если process_tables == False if not process_tables: cmd.append("--no-process-tables") # Добавляем флаг --include-neighbors, если include_neighbors == True if include_neighbors: cmd.append("--include-neighbors") # Добавляем флаг --use-qe, если use_qe == True if use_qe: cmd.append("--use-qe") # Добавляем флаг --use-sentence-transformers для определенных моделей if "FRIDA" in model or "sentence-transformer" in model.lower(): # Пример cmd.append("--use-sentence-transformers") # Формируем путь к лог-файлу log_filename = f"{run_id}_log.txt" log_path = os.path.join(args.log_dir, log_filename) # Запускаем пайплайн return_code = run_single_pipeline(cmd, log_path) if return_code == 0: completed_runs += 1 else: failed_runs += 1 print(f"*** ВНИМАНИЕ: Запуск {i+1} завершился с ошибкой! Лог: {log_path} ***") # --- Вывод итоговой статистики --- end_time_all = time.time() total_duration = end_time_all - start_time_all print(f"\n{'='*80}") print("Все запуски завершены.") print(f"Общее время выполнения: {total_duration:.2f} сек ({total_duration/60:.2f} мин)") print(f"Всего запусков: {total_runs}") print(f"Успешно завершено: {completed_runs}") print(f"Завершено с ошибками: {failed_runs}") print(f"Промежуточные результаты сохранены в: {args.intermediate_dir}") print(f"Логи запусков сохранены в: {args.log_dir}") print(f"{'='*80}") if __name__ == "__main__": main()