demo / backend /tasks /create_bench_config_file.py
tfrere's picture
update lighteval results
39acd70
raw
history blame
10.9 kB
"""
Task to create and save the configuration file
"""
import os
import pathlib
import uuid
import yaml
import shutil
import time
import threading
from typing import Optional, Dict, Any, List, Tuple
from loguru import logger
from huggingface_hub import HfApi
class CreateBenchConfigTask:
"""
Task to create and save a configuration file for YourbenchSimpleDemo
"""
def __init__(self, session_uid: Optional[str] = None):
"""
Initialize the task with a session ID
Args:
session_uid: Optional session ID, will be generated if None
"""
self.session_uid = session_uid or str(uuid.uuid4())
self.logs: List[str] = []
self.is_completed = False
self.is_running_flag = threading.Event()
self.thread = None
self._add_log("[INFO] Initializing configuration creation task")
def _add_log(self, message: str) -> None:
"""
Add a log message to the logs list
Args:
message: Log message to add
"""
if message not in self.logs: # Avoid duplicates
self.logs.append(message)
# Force a copy of the list to avoid reference issues
self.logs = self.logs.copy()
# Log to system logs
logger.info(f"[{self.session_uid}] {message}")
def get_logs(self) -> List[str]:
"""
Get all logs for this task
Returns:
List of log messages
"""
return self.logs.copy() # Retourner une copie pour éviter les problèmes de référence
def save_uploaded_file(self, file_path: str) -> str:
"""
Process the uploaded file that is already in the correct directory
Args:
file_path: Path to the uploaded file
Returns:
Path to the file (same as input)
"""
try:
# The file is already in the correct location: uploaded_files/{session_id}/uploaded_files/
# Just log that we're processing it and return the path
self._add_log(f"[INFO] Processing file: {os.path.basename(file_path)}")
return file_path
except Exception as e:
error_msg = f"Error processing file: {str(e)}"
self._add_log(f"[ERROR] {error_msg}")
raise RuntimeError(error_msg)
def generate_base_config(self, hf_org: str, hf_dataset_name: str) -> Dict[str, Any]:
"""
Create the base configuration dictionary
Args:
hf_org: Hugging Face organization name
hf_dataset_name: Hugging Face dataset name
Returns:
Configuration dictionary
"""
self._add_log(f"[INFO] Generating base configuration for {hf_dataset_name}")
# Check if HF token is available
hf_token = os.getenv("HF_TOKEN")
if not hf_token:
raise RuntimeError("HF_TOKEN environment variable is not defined")
return {
"hf_configuration": {
"token": "$HF_TOKEN", # Utiliser directement le token de l'environnement
"hf_organization": "$HF_ORGANIZATION",
"private": True,
"hf_dataset_name": hf_dataset_name,
"concat_if_exist": False,
},
"model_list": [
{
"model_name": "Qwen/Qwen2.5-VL-72B-Instruct",
"provider": "novita",
"api_key": "$HF_TOKEN",
"max_concurrent_requests": 32,
},
{
"model_name": "Qwen/Qwen2.5-72B-Instruct",
"provider": "novita",
"api_key": "$HF_TOKEN",
"max_concurrent_requests": 32,
}
],
"model_roles": {
"ingestion": ["Qwen/Qwen2.5-VL-72B-Instruct"],
"summarization": ["Qwen/Qwen2.5-72B-Instruct"],
"chunking": ["intfloat/multilingual-e5-large-instruct"],
"single_shot_question_generation": ["Qwen/Qwen2.5-72B-Instruct"],
"multi_hop_question_generation": ["Qwen/Qwen2.5-72B-Instruct"],
},
"pipeline": {
"ingestion": {
"source_documents_dir": f"uploaded_files/{self.session_uid}/uploaded_files/",
"output_dir": f"uploaded_files/{self.session_uid}/ingested",
"run": True,
},
"upload_ingest_to_hub": {
"source_documents_dir": f"uploaded_files/{self.session_uid}/ingested",
"run": True, # Réactivé pour l'upload sur le Hub
},
"summarization": {
"run": True,
},
"chunking": {
"run": True,
"chunking_configuration": {
"l_min_tokens": 64,
"l_max_tokens": 128,
"tau_threshold": 0.8,
"h_min": 2,
"h_max": 5,
"num_multihops_factor": 1,
},
},
"single_shot_question_generation": {
"run": True,
"additional_instructions": "Generate questions to test a curious adult",
"chunk_sampling": {
"mode": "count",
"value": 10,
"random_seed": 123,
},
},
"multi_hop_question_generation": {
"run": False,
},
"lighteval": {
"run": False,
},
},
}
def save_yaml_file(self, config: Dict[str, Any], path: str) -> str:
"""
Save the given configuration dictionary to a YAML file
Args:
config: Configuration dictionary
path: Path to save the file
Returns:
Path to the saved file
"""
try:
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as file:
yaml.dump(config, file, default_flow_style=False, sort_keys=False)
self._add_log(f"[INFO] Configuration saved: {path}")
return path
except Exception as e:
error_msg = f"Error saving configuration: {str(e)}"
self._add_log(f"[ERROR] {error_msg}")
raise RuntimeError(error_msg)
def _run_task(self, file_path: str) -> str:
"""
Internal method to run the task in a separate thread
Args:
file_path: Path to the uploaded file
Returns:
Path to the configuration file
"""
try:
# Use the default yourbench organization
org_name = os.getenv("HF_ORGANIZATION")
# Check if HF token is available
hf_token = os.getenv("HF_TOKEN")
if not hf_token:
raise RuntimeError("HF_TOKEN environment variable is not defined")
self._add_log(f"[INFO] Organization: {org_name}")
time.sleep(0.5) # Simulate delay
# Save the uploaded file
saved_file_path = self.save_uploaded_file(file_path)
time.sleep(1) # Simulate delay
# Path for the config file
config_dir = pathlib.Path(f"uploaded_files/{self.session_uid}")
config_path = config_dir / "config.yml"
# Generate dataset name based on session ID
dataset_name = f"yourbench_{self.session_uid}"
self._add_log(f"[INFO] Dataset name: {dataset_name}")
time.sleep(0.8) # Simulate delay
# Generate and save the configuration
config = self.generate_base_config(org_name, dataset_name)
time.sleep(1.2) # Simulate delay
config_file_path = self.save_yaml_file(config, str(config_path))
self._add_log(f"[INFO] Configuration generated successfully: {config_file_path}")
# Simulate additional processing
time.sleep(1.5) # Simulate delay
self._add_log("[INFO] Starting ingestion")
time.sleep(2) # Simulate delay
self._add_log(f"[INFO] Processing file: {dataset_name}")
time.sleep(2) # Simulate delay
self._add_log("[SUCCESS] Stage completed: config_generation")
# Tâche terminée
self.mark_task_completed()
return str(config_path)
except Exception as e:
error_msg = f"Error generating configuration: {str(e)}"
self._add_log(f"[ERROR] {error_msg}")
self.mark_task_completed()
raise RuntimeError(error_msg)
def run(self, file_path: str, token: Optional[str] = None) -> str:
"""
Run the task to create and save the configuration file asynchronously
Args:
file_path: Path to the uploaded file
token: Hugging Face token (not used, using HF_TOKEN from environment)
Returns:
Path to the configuration file
"""
# Mark the task as running
self.is_running_flag.set()
# Run the task directly without threading
try:
config_path = self._run_task(file_path)
return config_path
except Exception as e:
error_msg = f"Error generating configuration: {str(e)}"
self._add_log(f"[ERROR] {error_msg}")
self.mark_task_completed()
raise RuntimeError(error_msg)
def is_running(self) -> bool:
"""
Check if the task is running
Returns:
True if running, False otherwise
"""
return self.is_running_flag.is_set() and not self.is_completed
def is_task_completed(self) -> bool:
"""
Check if the task is completed
Returns:
True if completed, False otherwise
"""
return self.is_completed
def mark_task_completed(self) -> None:
"""
Mark the task as completed
"""
self.is_completed = True
self.is_running_flag.clear()
self._add_log("[INFO] Configuration generation task completed")