Spaces:
Sleeping
Sleeping
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
Скрипт для запуска множества пайплайнов оценки (`pipeline.py`) | |
с различными комбинациями параметров. | |
Собирает команды для `pipeline.py` и запускает их последовательно, | |
логируя вывод каждого запуска. | |
""" | |
import argparse | |
import json | |
import os | |
import pathlib | |
import subprocess | |
import sys | |
import time | |
from datetime import datetime | |
from itertools import product | |
from uuid import uuid4 | |
# --- Конфигурация Экспериментов --- | |
# Модели для тестирования | |
MODELS_TO_TEST = [ | |
# "intfloat/e5-base", | |
# "intfloat/e5-large", | |
"BAAI/bge-m3", | |
# "deepvk/USER-bge-m3" | |
# "ai-forever/FRIDA" # Требует --use-sentence-transformers | |
] | |
# Параметры чанкинга (слова / перекрытие) | |
CHUNKING_PARAMS = [ | |
# Пример для стратегии "fixed_size" | |
{"strategy": "fixed_size", "params": {"words_per_chunk": 50, "overlap_words": 25}}, | |
# {"strategy": "fixed_size", "params": {"words_per_chunk": 100, "overlap_words": 25}}, | |
# {"strategy": "fixed_size", "params": {"words_per_chunk": 50, "overlap_words": 0}}, | |
# TODO: Добавить другие стратегии и их параметры, если нужно | |
# {"strategy": "some_other_strategy", "params": {"param1": "value1"}} | |
] | |
# Значения Top-N для ретривера | |
TOP_N_VALUES = [20, 50, 100] | |
# Использовать ли сборку контекста (InjectionBuilder) | |
USE_INJECTION_OPTIONS = [False, True] | |
# Порог схожести для fuzzy сравнения (чанк/пункт) | |
SIMILARITY_THRESHOLDS = [0.7] | |
# Опции использования Query Expansion | |
USE_QE_OPTIONS = [False, True] | |
# Опции обработки таблиц | |
PROCESS_TABLES_OPTIONS = [True] | |
# Опции включения соседей | |
INCLUDE_NEIGHBORS_OPTIONS = [True] | |
# --- Настройки Скрипта --- | |
DEFAULT_LOG_DIR = "logs" # Директория для логов отдельных запусков pipeline.py | |
DEFAULT_INTERMEDIATE_DIR = "data/intermediate" # Куда pipeline.py сохраняет свои результаты | |
DEFAULT_PYTHON_EXECUTABLE = sys.executable # Использовать тот же python, что и для запуска этого скрипта | |
def parse_args(): | |
"""Парсит аргументы командной строки.""" | |
parser = argparse.ArgumentParser(description="Запуск серии оценочных пайплайнов") | |
# Флаги для пропуска определенных измерений | |
parser.add_argument("--skip-models", action="store_true", | |
help="Пропустить итерацию по разным моделям (использовать первую в списке)") | |
parser.add_argument("--skip-chunking", action="store_true", | |
help="Пропустить итерацию по разным параметрам чанкинга (использовать первую в списке)") | |
parser.add_argument("--skip-top-n", action="store_true", | |
help="Пропустить итерацию по разным top_n (использовать первое значение)") | |
parser.add_argument("--skip-injection", action="store_true", | |
help="Пропустить итерацию по опциям сборки контекста (использовать False)") | |
parser.add_argument("--skip-thresholds", action="store_true", | |
help="Пропустить итерацию по порогам схожести (использовать первый)") | |
parser.add_argument("--skip-process-tables", action="store_true", | |
help="Пропустить итерацию по обработке таблиц (использовать True)") | |
parser.add_argument("--skip-include-neighbors", action="store_true", | |
help="Пропустить итерацию по включению соседей (использовать False)") | |
parser.add_argument("--skip-qe", action="store_true", | |
help="Пропустить итерацию по использованию Query Expansion (использовать False)") | |
# Настройки путей и выполнения | |
parser.add_argument("--log-dir", type=str, default=DEFAULT_LOG_DIR, | |
help=f"Директория для сохранения логов запусков (по умолчанию: {DEFAULT_LOG_DIR})") | |
parser.add_argument("--intermediate-dir", type=str, default=DEFAULT_INTERMEDIATE_DIR, | |
help=f"Директория для промежуточных результатов pipeline.py (по умолчанию: {DEFAULT_INTERMEDIATE_DIR})") | |
parser.add_argument("--device", type=str, default="cuda:0", | |
help="Устройство для вычислений в pipeline.py (напр., cpu, cuda:0)") | |
parser.add_argument("--python-executable", type=str, default=DEFAULT_PYTHON_EXECUTABLE, | |
help="Путь к интерпретатору Python для запуска pipeline.py") | |
# Параметры, передаваемые в pipeline.py (если не перебираются) | |
parser.add_argument("--data-folder", type=str, default="data/input/docs", help="Папка с документами для pipeline.py") | |
parser.add_argument("--search-dataset-path", type=str, default="data/input/search_dataset_text.xlsx", help="Поисковый датасет для pipeline.py") | |
parser.add_argument("--qa-dataset-path", type=str, default="data/input/question_answering.xlsx", help="QA датасет для pipeline.py") | |
return parser.parse_args() | |
def run_single_pipeline(cmd: list[str], log_path: str): | |
""" | |
Запускает один экземпляр pipeline.py и логирует его вывод. | |
Args: | |
cmd: Список аргументов команды для subprocess. | |
log_path: Путь к файлу для сохранения лога. | |
Returns: | |
Код возврата процесса. | |
""" | |
print(f"\n--- Запуск: {' '.join(cmd)} ---") | |
print(f"--- Лог: {log_path} --- ") | |
start_time = time.time() | |
return_code = -1 | |
try: | |
with open(log_path, "w", encoding="utf-8") as log_file: | |
log_file.write(f"Команда: {' '.join(cmd)}\n") | |
log_file.write(f"Время запуска: {datetime.now()}\n\n") | |
log_file.flush() | |
# Запускаем процесс | |
process = subprocess.Popen( | |
cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, # Перенаправляем stderr в stdout | |
text=True, | |
encoding='utf-8', # Указываем кодировку | |
errors='replace', # Заменяем ошибки кодирования | |
bufsize=1 # Построчная буферизация | |
) | |
# Читаем и пишем вывод построчно | |
for line in process.stdout: | |
print(line, end="") # Выводим в консоль | |
log_file.write(line) # Пишем в лог | |
log_file.flush() | |
# Ждем завершения и получаем код возврата | |
process.wait() | |
return_code = process.returncode | |
except Exception as e: | |
print(f"\nОшибка при запуске процесса: {e}") | |
with open(log_path, "a", encoding="utf-8") as log_file: | |
log_file.write(f"\nОшибка при запуске: {e}\n") | |
return_code = 1 # Считаем ошибкой | |
end_time = time.time() | |
duration = end_time - start_time | |
result_message = f"Успешно завершено за {duration:.2f} сек." | |
if return_code != 0: | |
result_message = f"Завершено с ошибкой (код {return_code}) за {duration:.2f} сек." | |
print(f"--- {result_message} ---") | |
with open(log_path, "a", encoding="utf-8") as log_file: | |
log_file.write(f"\nВремя завершения: {datetime.now()}") | |
log_file.write(f"\nДлительность: {duration:.2f} сек.") | |
log_file.write(f"\nКод возврата: {return_code}\n") | |
return return_code | |
def main(): | |
"""Основная функция скрипта.""" | |
args = parse_args() | |
# --- Генерируем ID для всей серии запусков --- | |
batch_run_id = f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
print(f"Запуск серии экспериментов. Batch ID: {batch_run_id}") | |
# Создаем директории для логов и промежуточных результатов | |
os.makedirs(args.log_dir, exist_ok=True) | |
os.makedirs(args.intermediate_dir, exist_ok=True) | |
# Определяем абсолютный путь к pipeline.py | |
RUN_PIPELINES_SCRIPT_PATH = pathlib.Path(__file__).resolve() | |
SCRIPTS_TESTING_DIR = RUN_PIPELINES_SCRIPT_PATH.parent | |
PIPELINE_SCRIPT_PATH = SCRIPTS_TESTING_DIR / "pipeline.py" | |
# --- Определяем параметры для перебора --- | |
models = [MODELS_TO_TEST[0]] if args.skip_models else MODELS_TO_TEST | |
chunking_configs = [CHUNKING_PARAMS[0]] if args.skip_chunking else CHUNKING_PARAMS | |
top_n_list = [TOP_N_VALUES[0]] if args.skip_top_n else TOP_N_VALUES | |
use_injection_list = [False] if args.skip_injection else USE_INJECTION_OPTIONS | |
threshold_list = [SIMILARITY_THRESHOLDS[0]] if args.skip_thresholds else SIMILARITY_THRESHOLDS | |
# Определяем списки для новых измерений | |
process_tables_list = [PROCESS_TABLES_OPTIONS[0]] if args.skip_process_tables else PROCESS_TABLES_OPTIONS | |
include_neighbors_list = [INCLUDE_NEIGHBORS_OPTIONS[0]] if args.skip_include_neighbors else INCLUDE_NEIGHBORS_OPTIONS | |
use_qe_list = [USE_QE_OPTIONS[0]] if args.skip_qe else USE_QE_OPTIONS | |
# --- Создаем список всех комбинаций параметров --- | |
parameter_combinations = list(product( | |
models, | |
chunking_configs, | |
top_n_list, | |
use_injection_list, | |
threshold_list, | |
process_tables_list, | |
include_neighbors_list, | |
use_qe_list | |
)) | |
total_runs = len(parameter_combinations) | |
print(f"Всего запланировано запусков: {total_runs}") | |
# --- Запускаем пайплайны для каждой комбинации --- | |
completed_runs = 0 | |
failed_runs = 0 | |
start_time_all = time.time() | |
for i, (model, chunk_cfg, top_n, use_injection, threshold, process_tables, include_neighbors, use_qe) in enumerate(parameter_combinations): | |
print(f"\n{'='*80}") | |
print(f"Запуск {i+1}/{total_runs}") | |
print(f" Модель: {model}") | |
# Логируем параметры чанкинга | |
strategy = chunk_cfg['strategy'] | |
params = chunk_cfg['params'] | |
params_str = json.dumps(params, ensure_ascii=False) | |
print(f" Чанкинг: Стратегия='{strategy}', Параметры={params_str}") | |
print(f" Обработка таблиц: {process_tables}") | |
print(f" Top-N: {top_n}") | |
print(f" Сборка контекста: {use_injection}") | |
print(f" Query Expansion: {use_qe}") | |
print(f" Включение соседей: {include_neighbors}") | |
print(f" Порог схожести: {threshold}") | |
print(f"{'='*80}") | |
# Генерируем уникальный ID для этого запуска | |
run_id = f"run_{datetime.now().strftime('%Y%m%d%H%M%S')}_{uuid4().hex[:8]}" | |
# Формируем команду для pipeline.py | |
cmd = [ | |
args.python_executable, | |
str(PIPELINE_SCRIPT_PATH), # Используем абсолютный путь | |
"--run-id", run_id, | |
"--batch-id", batch_run_id, | |
"--data-folder", args.data_folder, | |
"--search-dataset-path", args.search_dataset_path, | |
"--output-dir", args.intermediate_dir, | |
"--model-name", model, | |
"--chunking-strategy", strategy, | |
"--strategy-params", params_str, | |
"--top-n", str(top_n), | |
"--similarity-threshold", str(threshold), | |
"--device", args.device, | |
] | |
# Добавляем флаг --use-injection, если нужно | |
if use_injection: | |
cmd.append("--use-injection") | |
# Добавляем флаг --no-process-tables, если process_tables == False | |
if not process_tables: | |
cmd.append("--no-process-tables") | |
# Добавляем флаг --include-neighbors, если include_neighbors == True | |
if include_neighbors: | |
cmd.append("--include-neighbors") | |
# Добавляем флаг --use-qe, если use_qe == True | |
if use_qe: | |
cmd.append("--use-qe") | |
# Добавляем флаг --use-sentence-transformers для определенных моделей | |
if "FRIDA" in model or "sentence-transformer" in model.lower(): # Пример | |
cmd.append("--use-sentence-transformers") | |
# Формируем путь к лог-файлу | |
log_filename = f"{run_id}_log.txt" | |
log_path = os.path.join(args.log_dir, log_filename) | |
# Запускаем пайплайн | |
return_code = run_single_pipeline(cmd, log_path) | |
if return_code == 0: | |
completed_runs += 1 | |
else: | |
failed_runs += 1 | |
print(f"*** ВНИМАНИЕ: Запуск {i+1} завершился с ошибкой! Лог: {log_path} ***") | |
# --- Вывод итоговой статистики --- | |
end_time_all = time.time() | |
total_duration = end_time_all - start_time_all | |
print(f"\n{'='*80}") | |
print("Все запуски завершены.") | |
print(f"Общее время выполнения: {total_duration:.2f} сек ({total_duration/60:.2f} мин)") | |
print(f"Всего запусков: {total_runs}") | |
print(f"Успешно завершено: {completed_runs}") | |
print(f"Завершено с ошибками: {failed_runs}") | |
print(f"Промежуточные результаты сохранены в: {args.intermediate_dir}") | |
print(f"Логи запусков сохранены в: {args.log_dir}") | |
print(f"{'='*80}") | |
if __name__ == "__main__": | |
main() | |