muryshev's picture
update
fd485d9
raw
history blame
15.1 kB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Скрипт для запуска множества пайплайнов оценки (`pipeline.py`)
с различными комбинациями параметров.
Собирает команды для `pipeline.py` и запускает их последовательно,
логируя вывод каждого запуска.
"""
import argparse
import json
import os
import pathlib
import subprocess
import sys
import time
from datetime import datetime
from itertools import product
from uuid import uuid4
# --- Конфигурация Экспериментов ---
# Модели для тестирования
MODELS_TO_TEST = [
# "intfloat/e5-base",
# "intfloat/e5-large",
"BAAI/bge-m3",
# "deepvk/USER-bge-m3"
# "ai-forever/FRIDA" # Требует --use-sentence-transformers
]
# Параметры чанкинга (слова / перекрытие)
CHUNKING_PARAMS = [
# Пример для стратегии "fixed_size"
{"strategy": "fixed_size", "params": {"words_per_chunk": 50, "overlap_words": 25}},
# {"strategy": "fixed_size", "params": {"words_per_chunk": 100, "overlap_words": 25}},
# {"strategy": "fixed_size", "params": {"words_per_chunk": 50, "overlap_words": 0}},
# TODO: Добавить другие стратегии и их параметры, если нужно
# {"strategy": "some_other_strategy", "params": {"param1": "value1"}}
]
# Значения Top-N для ретривера
TOP_N_VALUES = [20, 50, 100]
# Использовать ли сборку контекста (InjectionBuilder)
USE_INJECTION_OPTIONS = [False, True]
# Порог схожести для fuzzy сравнения (чанк/пункт)
SIMILARITY_THRESHOLDS = [0.7]
# Опции использования Query Expansion
USE_QE_OPTIONS = [False, True]
# Опции обработки таблиц
PROCESS_TABLES_OPTIONS = [True]
# Опции включения соседей
INCLUDE_NEIGHBORS_OPTIONS = [True]
# --- Настройки Скрипта ---
DEFAULT_LOG_DIR = "logs" # Директория для логов отдельных запусков pipeline.py
DEFAULT_INTERMEDIATE_DIR = "data/intermediate" # Куда pipeline.py сохраняет свои результаты
DEFAULT_PYTHON_EXECUTABLE = sys.executable # Использовать тот же python, что и для запуска этого скрипта
def parse_args():
"""Парсит аргументы командной строки."""
parser = argparse.ArgumentParser(description="Запуск серии оценочных пайплайнов")
# Флаги для пропуска определенных измерений
parser.add_argument("--skip-models", action="store_true",
help="Пропустить итерацию по разным моделям (использовать первую в списке)")
parser.add_argument("--skip-chunking", action="store_true",
help="Пропустить итерацию по разным параметрам чанкинга (использовать первую в списке)")
parser.add_argument("--skip-top-n", action="store_true",
help="Пропустить итерацию по разным top_n (использовать первое значение)")
parser.add_argument("--skip-injection", action="store_true",
help="Пропустить итерацию по опциям сборки контекста (использовать False)")
parser.add_argument("--skip-thresholds", action="store_true",
help="Пропустить итерацию по порогам схожести (использовать первый)")
parser.add_argument("--skip-process-tables", action="store_true",
help="Пропустить итерацию по обработке таблиц (использовать True)")
parser.add_argument("--skip-include-neighbors", action="store_true",
help="Пропустить итерацию по включению соседей (использовать False)")
parser.add_argument("--skip-qe", action="store_true",
help="Пропустить итерацию по использованию Query Expansion (использовать False)")
# Настройки путей и выполнения
parser.add_argument("--log-dir", type=str, default=DEFAULT_LOG_DIR,
help=f"Директория для сохранения логов запусков (по умолчанию: {DEFAULT_LOG_DIR})")
parser.add_argument("--intermediate-dir", type=str, default=DEFAULT_INTERMEDIATE_DIR,
help=f"Директория для промежуточных результатов pipeline.py (по умолчанию: {DEFAULT_INTERMEDIATE_DIR})")
parser.add_argument("--device", type=str, default="cuda:0",
help="Устройство для вычислений в pipeline.py (напр., cpu, cuda:0)")
parser.add_argument("--python-executable", type=str, default=DEFAULT_PYTHON_EXECUTABLE,
help="Путь к интерпретатору Python для запуска pipeline.py")
# Параметры, передаваемые в pipeline.py (если не перебираются)
parser.add_argument("--data-folder", type=str, default="data/input/docs", help="Папка с документами для pipeline.py")
parser.add_argument("--search-dataset-path", type=str, default="data/input/search_dataset_text.xlsx", help="Поисковый датасет для pipeline.py")
parser.add_argument("--qa-dataset-path", type=str, default="data/input/question_answering.xlsx", help="QA датасет для pipeline.py")
return parser.parse_args()
def run_single_pipeline(cmd: list[str], log_path: str):
"""
Запускает один экземпляр pipeline.py и логирует его вывод.
Args:
cmd: Список аргументов команды для subprocess.
log_path: Путь к файлу для сохранения лога.
Returns:
Код возврата процесса.
"""
print(f"\n--- Запуск: {' '.join(cmd)} ---")
print(f"--- Лог: {log_path} --- ")
start_time = time.time()
return_code = -1
try:
with open(log_path, "w", encoding="utf-8") as log_file:
log_file.write(f"Команда: {' '.join(cmd)}\n")
log_file.write(f"Время запуска: {datetime.now()}\n\n")
log_file.flush()
# Запускаем процесс
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Перенаправляем stderr в stdout
text=True,
encoding='utf-8', # Указываем кодировку
errors='replace', # Заменяем ошибки кодирования
bufsize=1 # Построчная буферизация
)
# Читаем и пишем вывод построчно
for line in process.stdout:
print(line, end="") # Выводим в консоль
log_file.write(line) # Пишем в лог
log_file.flush()
# Ждем завершения и получаем код возврата
process.wait()
return_code = process.returncode
except Exception as e:
print(f"\nОшибка при запуске процесса: {e}")
with open(log_path, "a", encoding="utf-8") as log_file:
log_file.write(f"\nОшибка при запуске: {e}\n")
return_code = 1 # Считаем ошибкой
end_time = time.time()
duration = end_time - start_time
result_message = f"Успешно завершено за {duration:.2f} сек."
if return_code != 0:
result_message = f"Завершено с ошибкой (код {return_code}) за {duration:.2f} сек."
print(f"--- {result_message} ---")
with open(log_path, "a", encoding="utf-8") as log_file:
log_file.write(f"\nВремя завершения: {datetime.now()}")
log_file.write(f"\nДлительность: {duration:.2f} сек.")
log_file.write(f"\nКод возврата: {return_code}\n")
return return_code
def main():
"""Основная функция скрипта."""
args = parse_args()
# --- Генерируем ID для всей серии запусков ---
batch_run_id = f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
print(f"Запуск серии экспериментов. Batch ID: {batch_run_id}")
# Создаем директории для логов и промежуточных результатов
os.makedirs(args.log_dir, exist_ok=True)
os.makedirs(args.intermediate_dir, exist_ok=True)
# Определяем абсолютный путь к pipeline.py
RUN_PIPELINES_SCRIPT_PATH = pathlib.Path(__file__).resolve()
SCRIPTS_TESTING_DIR = RUN_PIPELINES_SCRIPT_PATH.parent
PIPELINE_SCRIPT_PATH = SCRIPTS_TESTING_DIR / "pipeline.py"
# --- Определяем параметры для перебора ---
models = [MODELS_TO_TEST[0]] if args.skip_models else MODELS_TO_TEST
chunking_configs = [CHUNKING_PARAMS[0]] if args.skip_chunking else CHUNKING_PARAMS
top_n_list = [TOP_N_VALUES[0]] if args.skip_top_n else TOP_N_VALUES
use_injection_list = [False] if args.skip_injection else USE_INJECTION_OPTIONS
threshold_list = [SIMILARITY_THRESHOLDS[0]] if args.skip_thresholds else SIMILARITY_THRESHOLDS
# Определяем списки для новых измерений
process_tables_list = [PROCESS_TABLES_OPTIONS[0]] if args.skip_process_tables else PROCESS_TABLES_OPTIONS
include_neighbors_list = [INCLUDE_NEIGHBORS_OPTIONS[0]] if args.skip_include_neighbors else INCLUDE_NEIGHBORS_OPTIONS
use_qe_list = [USE_QE_OPTIONS[0]] if args.skip_qe else USE_QE_OPTIONS
# --- Создаем список всех комбинаций параметров ---
parameter_combinations = list(product(
models,
chunking_configs,
top_n_list,
use_injection_list,
threshold_list,
process_tables_list,
include_neighbors_list,
use_qe_list
))
total_runs = len(parameter_combinations)
print(f"Всего запланировано запусков: {total_runs}")
# --- Запускаем пайплайны для каждой комбинации ---
completed_runs = 0
failed_runs = 0
start_time_all = time.time()
for i, (model, chunk_cfg, top_n, use_injection, threshold, process_tables, include_neighbors, use_qe) in enumerate(parameter_combinations):
print(f"\n{'='*80}")
print(f"Запуск {i+1}/{total_runs}")
print(f" Модель: {model}")
# Логируем параметры чанкинга
strategy = chunk_cfg['strategy']
params = chunk_cfg['params']
params_str = json.dumps(params, ensure_ascii=False)
print(f" Чанкинг: Стратегия='{strategy}', Параметры={params_str}")
print(f" Обработка таблиц: {process_tables}")
print(f" Top-N: {top_n}")
print(f" Сборка контекста: {use_injection}")
print(f" Query Expansion: {use_qe}")
print(f" Включение соседей: {include_neighbors}")
print(f" Порог схожести: {threshold}")
print(f"{'='*80}")
# Генерируем уникальный ID для этого запуска
run_id = f"run_{datetime.now().strftime('%Y%m%d%H%M%S')}_{uuid4().hex[:8]}"
# Формируем команду для pipeline.py
cmd = [
args.python_executable,
str(PIPELINE_SCRIPT_PATH), # Используем абсолютный путь
"--run-id", run_id,
"--batch-id", batch_run_id,
"--data-folder", args.data_folder,
"--search-dataset-path", args.search_dataset_path,
"--output-dir", args.intermediate_dir,
"--model-name", model,
"--chunking-strategy", strategy,
"--strategy-params", params_str,
"--top-n", str(top_n),
"--similarity-threshold", str(threshold),
"--device", args.device,
]
# Добавляем флаг --use-injection, если нужно
if use_injection:
cmd.append("--use-injection")
# Добавляем флаг --no-process-tables, если process_tables == False
if not process_tables:
cmd.append("--no-process-tables")
# Добавляем флаг --include-neighbors, если include_neighbors == True
if include_neighbors:
cmd.append("--include-neighbors")
# Добавляем флаг --use-qe, если use_qe == True
if use_qe:
cmd.append("--use-qe")
# Добавляем флаг --use-sentence-transformers для определенных моделей
if "FRIDA" in model or "sentence-transformer" in model.lower(): # Пример
cmd.append("--use-sentence-transformers")
# Формируем путь к лог-файлу
log_filename = f"{run_id}_log.txt"
log_path = os.path.join(args.log_dir, log_filename)
# Запускаем пайплайн
return_code = run_single_pipeline(cmd, log_path)
if return_code == 0:
completed_runs += 1
else:
failed_runs += 1
print(f"*** ВНИМАНИЕ: Запуск {i+1} завершился с ошибкой! Лог: {log_path} ***")
# --- Вывод итоговой статистики ---
end_time_all = time.time()
total_duration = end_time_all - start_time_all
print(f"\n{'='*80}")
print("Все запуски завершены.")
print(f"Общее время выполнения: {total_duration:.2f} сек ({total_duration/60:.2f} мин)")
print(f"Всего запусков: {total_runs}")
print(f"Успешно завершено: {completed_runs}")
print(f"Завершено с ошибками: {failed_runs}")
print(f"Промежуточные результаты сохранены в: {args.intermediate_dir}")
print(f"Логи запусков сохранены в: {args.log_dir}")
print(f"{'='*80}")
if __name__ == "__main__":
main()