Spaces:
Sleeping
Sleeping
File size: 15,067 Bytes
fd485d9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 |
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Скрипт для запуска множества пайплайнов оценки (`pipeline.py`)
с различными комбинациями параметров.
Собирает команды для `pipeline.py` и запускает их последовательно,
логируя вывод каждого запуска.
"""
import argparse
import json
import os
import pathlib
import subprocess
import sys
import time
from datetime import datetime
from itertools import product
from uuid import uuid4
# --- Конфигурация Экспериментов ---
# Модели для тестирования
MODELS_TO_TEST = [
# "intfloat/e5-base",
# "intfloat/e5-large",
"BAAI/bge-m3",
# "deepvk/USER-bge-m3"
# "ai-forever/FRIDA" # Требует --use-sentence-transformers
]
# Параметры чанкинга (слова / перекрытие)
CHUNKING_PARAMS = [
# Пример для стратегии "fixed_size"
{"strategy": "fixed_size", "params": {"words_per_chunk": 50, "overlap_words": 25}},
# {"strategy": "fixed_size", "params": {"words_per_chunk": 100, "overlap_words": 25}},
# {"strategy": "fixed_size", "params": {"words_per_chunk": 50, "overlap_words": 0}},
# TODO: Добавить другие стратегии и их параметры, если нужно
# {"strategy": "some_other_strategy", "params": {"param1": "value1"}}
]
# Значения Top-N для ретривера
TOP_N_VALUES = [20, 50, 100]
# Использовать ли сборку контекста (InjectionBuilder)
USE_INJECTION_OPTIONS = [False, True]
# Порог схожести для fuzzy сравнения (чанк/пункт)
SIMILARITY_THRESHOLDS = [0.7]
# Опции использования Query Expansion
USE_QE_OPTIONS = [False, True]
# Опции обработки таблиц
PROCESS_TABLES_OPTIONS = [True]
# Опции включения соседей
INCLUDE_NEIGHBORS_OPTIONS = [True]
# --- Настройки Скрипта ---
DEFAULT_LOG_DIR = "logs" # Директория для логов отдельных запусков pipeline.py
DEFAULT_INTERMEDIATE_DIR = "data/intermediate" # Куда pipeline.py сохраняет свои результаты
DEFAULT_PYTHON_EXECUTABLE = sys.executable # Использовать тот же python, что и для запуска этого скрипта
def parse_args():
"""Парсит аргументы командной строки."""
parser = argparse.ArgumentParser(description="Запуск серии оценочных пайплайнов")
# Флаги для пропуска определенных измерений
parser.add_argument("--skip-models", action="store_true",
help="Пропустить итерацию по разным моделям (использовать первую в списке)")
parser.add_argument("--skip-chunking", action="store_true",
help="Пропустить итерацию по разным параметрам чанкинга (использовать первую в списке)")
parser.add_argument("--skip-top-n", action="store_true",
help="Пропустить итерацию по разным top_n (использовать первое значение)")
parser.add_argument("--skip-injection", action="store_true",
help="Пропустить итерацию по опциям сборки контекста (использовать False)")
parser.add_argument("--skip-thresholds", action="store_true",
help="Пропустить итерацию по порогам схожести (использовать первый)")
parser.add_argument("--skip-process-tables", action="store_true",
help="Пропустить итерацию по обработке таблиц (использовать True)")
parser.add_argument("--skip-include-neighbors", action="store_true",
help="Пропустить итерацию по включению соседей (использовать False)")
parser.add_argument("--skip-qe", action="store_true",
help="Пропустить итерацию по использованию Query Expansion (использовать False)")
# Настройки путей и выполнения
parser.add_argument("--log-dir", type=str, default=DEFAULT_LOG_DIR,
help=f"Директория для сохранения логов запусков (по умолчанию: {DEFAULT_LOG_DIR})")
parser.add_argument("--intermediate-dir", type=str, default=DEFAULT_INTERMEDIATE_DIR,
help=f"Директория для промежуточных результатов pipeline.py (по умолчанию: {DEFAULT_INTERMEDIATE_DIR})")
parser.add_argument("--device", type=str, default="cuda:0",
help="Устройство для вычислений в pipeline.py (напр., cpu, cuda:0)")
parser.add_argument("--python-executable", type=str, default=DEFAULT_PYTHON_EXECUTABLE,
help="Путь к интерпретатору Python для запуска pipeline.py")
# Параметры, передаваемые в pipeline.py (если не перебираются)
parser.add_argument("--data-folder", type=str, default="data/input/docs", help="Папка с документами для pipeline.py")
parser.add_argument("--search-dataset-path", type=str, default="data/input/search_dataset_text.xlsx", help="Поисковый датасет для pipeline.py")
parser.add_argument("--qa-dataset-path", type=str, default="data/input/question_answering.xlsx", help="QA датасет для pipeline.py")
return parser.parse_args()
def run_single_pipeline(cmd: list[str], log_path: str):
"""
Запускает один экземпляр pipeline.py и логирует его вывод.
Args:
cmd: Список аргументов команды для subprocess.
log_path: Путь к файлу для сохранения лога.
Returns:
Код возврата процесса.
"""
print(f"\n--- Запуск: {' '.join(cmd)} ---")
print(f"--- Лог: {log_path} --- ")
start_time = time.time()
return_code = -1
try:
with open(log_path, "w", encoding="utf-8") as log_file:
log_file.write(f"Команда: {' '.join(cmd)}\n")
log_file.write(f"Время запуска: {datetime.now()}\n\n")
log_file.flush()
# Запускаем процесс
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Перенаправляем stderr в stdout
text=True,
encoding='utf-8', # Указываем кодировку
errors='replace', # Заменяем ошибки кодирования
bufsize=1 # Построчная буферизация
)
# Читаем и пишем вывод построчно
for line in process.stdout:
print(line, end="") # Выводим в консоль
log_file.write(line) # Пишем в лог
log_file.flush()
# Ждем завершения и получаем код возврата
process.wait()
return_code = process.returncode
except Exception as e:
print(f"\nОшибка при запуске процесса: {e}")
with open(log_path, "a", encoding="utf-8") as log_file:
log_file.write(f"\nОшибка при запуске: {e}\n")
return_code = 1 # Считаем ошибкой
end_time = time.time()
duration = end_time - start_time
result_message = f"Успешно завершено за {duration:.2f} сек."
if return_code != 0:
result_message = f"Завершено с ошибкой (код {return_code}) за {duration:.2f} сек."
print(f"--- {result_message} ---")
with open(log_path, "a", encoding="utf-8") as log_file:
log_file.write(f"\nВремя завершения: {datetime.now()}")
log_file.write(f"\nДлительность: {duration:.2f} сек.")
log_file.write(f"\nКод возврата: {return_code}\n")
return return_code
def main():
"""Основная функция скрипта."""
args = parse_args()
# --- Генерируем ID для всей серии запусков ---
batch_run_id = f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
print(f"Запуск серии экспериментов. Batch ID: {batch_run_id}")
# Создаем директории для логов и промежуточных результатов
os.makedirs(args.log_dir, exist_ok=True)
os.makedirs(args.intermediate_dir, exist_ok=True)
# Определяем абсолютный путь к pipeline.py
RUN_PIPELINES_SCRIPT_PATH = pathlib.Path(__file__).resolve()
SCRIPTS_TESTING_DIR = RUN_PIPELINES_SCRIPT_PATH.parent
PIPELINE_SCRIPT_PATH = SCRIPTS_TESTING_DIR / "pipeline.py"
# --- Определяем параметры для перебора ---
models = [MODELS_TO_TEST[0]] if args.skip_models else MODELS_TO_TEST
chunking_configs = [CHUNKING_PARAMS[0]] if args.skip_chunking else CHUNKING_PARAMS
top_n_list = [TOP_N_VALUES[0]] if args.skip_top_n else TOP_N_VALUES
use_injection_list = [False] if args.skip_injection else USE_INJECTION_OPTIONS
threshold_list = [SIMILARITY_THRESHOLDS[0]] if args.skip_thresholds else SIMILARITY_THRESHOLDS
# Определяем списки для новых измерений
process_tables_list = [PROCESS_TABLES_OPTIONS[0]] if args.skip_process_tables else PROCESS_TABLES_OPTIONS
include_neighbors_list = [INCLUDE_NEIGHBORS_OPTIONS[0]] if args.skip_include_neighbors else INCLUDE_NEIGHBORS_OPTIONS
use_qe_list = [USE_QE_OPTIONS[0]] if args.skip_qe else USE_QE_OPTIONS
# --- Создаем список всех комбинаций параметров ---
parameter_combinations = list(product(
models,
chunking_configs,
top_n_list,
use_injection_list,
threshold_list,
process_tables_list,
include_neighbors_list,
use_qe_list
))
total_runs = len(parameter_combinations)
print(f"Всего запланировано запусков: {total_runs}")
# --- Запускаем пайплайны для каждой комбинации ---
completed_runs = 0
failed_runs = 0
start_time_all = time.time()
for i, (model, chunk_cfg, top_n, use_injection, threshold, process_tables, include_neighbors, use_qe) in enumerate(parameter_combinations):
print(f"\n{'='*80}")
print(f"Запуск {i+1}/{total_runs}")
print(f" Модель: {model}")
# Логируем параметры чанкинга
strategy = chunk_cfg['strategy']
params = chunk_cfg['params']
params_str = json.dumps(params, ensure_ascii=False)
print(f" Чанкинг: Стратегия='{strategy}', Параметры={params_str}")
print(f" Обработка таблиц: {process_tables}")
print(f" Top-N: {top_n}")
print(f" Сборка контекста: {use_injection}")
print(f" Query Expansion: {use_qe}")
print(f" Включение соседей: {include_neighbors}")
print(f" Порог схожести: {threshold}")
print(f"{'='*80}")
# Генерируем уникальный ID для этого запуска
run_id = f"run_{datetime.now().strftime('%Y%m%d%H%M%S')}_{uuid4().hex[:8]}"
# Формируем команду для pipeline.py
cmd = [
args.python_executable,
str(PIPELINE_SCRIPT_PATH), # Используем абсолютный путь
"--run-id", run_id,
"--batch-id", batch_run_id,
"--data-folder", args.data_folder,
"--search-dataset-path", args.search_dataset_path,
"--output-dir", args.intermediate_dir,
"--model-name", model,
"--chunking-strategy", strategy,
"--strategy-params", params_str,
"--top-n", str(top_n),
"--similarity-threshold", str(threshold),
"--device", args.device,
]
# Добавляем флаг --use-injection, если нужно
if use_injection:
cmd.append("--use-injection")
# Добавляем флаг --no-process-tables, если process_tables == False
if not process_tables:
cmd.append("--no-process-tables")
# Добавляем флаг --include-neighbors, если include_neighbors == True
if include_neighbors:
cmd.append("--include-neighbors")
# Добавляем флаг --use-qe, если use_qe == True
if use_qe:
cmd.append("--use-qe")
# Добавляем флаг --use-sentence-transformers для определенных моделей
if "FRIDA" in model or "sentence-transformer" in model.lower(): # Пример
cmd.append("--use-sentence-transformers")
# Формируем путь к лог-файлу
log_filename = f"{run_id}_log.txt"
log_path = os.path.join(args.log_dir, log_filename)
# Запускаем пайплайн
return_code = run_single_pipeline(cmd, log_path)
if return_code == 0:
completed_runs += 1
else:
failed_runs += 1
print(f"*** ВНИМАНИЕ: Запуск {i+1} завершился с ошибкой! Лог: {log_path} ***")
# --- Вывод итоговой статистики ---
end_time_all = time.time()
total_duration = end_time_all - start_time_all
print(f"\n{'='*80}")
print("Все запуски завершены.")
print(f"Общее время выполнения: {total_duration:.2f} сек ({total_duration/60:.2f} мин)")
print(f"Всего запусков: {total_runs}")
print(f"Успешно завершено: {completed_runs}")
print(f"Завершено с ошибками: {failed_runs}")
print(f"Промежуточные результаты сохранены в: {args.intermediate_dir}")
print(f"Логи запусков сохранены в: {args.log_dir}")
print(f"{'='*80}")
if __name__ == "__main__":
main()
|