Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
""" | |
Task to create and save the configuration file | |
""" | |
import os | |
import pathlib | |
import uuid | |
import yaml | |
import shutil | |
import time | |
import threading | |
from typing import Optional, Dict, Any, List, Tuple | |
from loguru import logger | |
from huggingface_hub import HfApi | |
class CreateBenchConfigTask: | |
""" | |
Task to create and save a configuration file for YourbenchSimpleDemo | |
""" | |
def __init__(self, session_uid: Optional[str] = None): | |
""" | |
Initialize the task with a session ID | |
Args: | |
session_uid: Optional session ID, will be generated if None | |
""" | |
self.session_uid = session_uid or str(uuid.uuid4()) | |
self.logs: List[str] = [] | |
self.is_completed = False | |
self.is_running_flag = threading.Event() | |
self.thread = None | |
self._add_log("[INFO] Initializing configuration creation task") | |
def _add_log(self, message: str) -> None: | |
""" | |
Add a log message to the logs list | |
Args: | |
message: Log message to add | |
""" | |
if message not in self.logs: # Avoid duplicates | |
self.logs.append(message) | |
# Force a copy of the list to avoid reference issues | |
self.logs = self.logs.copy() | |
# Log to system logs | |
logger.info(f"[{self.session_uid}] {message}") | |
def get_logs(self) -> List[str]: | |
""" | |
Get all logs for this task | |
Returns: | |
List of log messages | |
""" | |
return self.logs.copy() # Retourner une copie pour éviter les problèmes de référence | |
def save_uploaded_file(self, file_path: str) -> str: | |
""" | |
Process the uploaded file that is already in the correct directory | |
Args: | |
file_path: Path to the uploaded file | |
Returns: | |
Path to the file (same as input) | |
""" | |
try: | |
# The file is already in the correct location: uploaded_files/{session_id}/uploaded_files/ | |
# Just log that we're processing it and return the path | |
self._add_log(f"[INFO] Processing file: {os.path.basename(file_path)}") | |
return file_path | |
except Exception as e: | |
error_msg = f"Error processing file: {str(e)}" | |
self._add_log(f"[ERROR] {error_msg}") | |
raise RuntimeError(error_msg) | |
def generate_base_config(self, hf_org: str, hf_dataset_name: str) -> Dict[str, Any]: | |
""" | |
Create the base configuration dictionary | |
Args: | |
hf_org: Hugging Face organization name | |
hf_dataset_name: Hugging Face dataset name | |
Returns: | |
Configuration dictionary | |
""" | |
self._add_log(f"[INFO] Generating base configuration for {hf_dataset_name}") | |
# Check if HF token is available | |
hf_token = os.getenv("HF_TOKEN") | |
if not hf_token: | |
raise RuntimeError("HF_TOKEN environment variable is not defined") | |
return { | |
"hf_configuration": { | |
"token": "$HF_TOKEN", # Utiliser directement le token de l'environnement | |
"hf_organization": "$HF_ORGANIZATION", | |
"private": True, | |
"hf_dataset_name": hf_dataset_name, | |
"concat_if_exist": False, | |
}, | |
"model_list": [ | |
{ | |
"model_name": "Qwen/Qwen2.5-VL-72B-Instruct", | |
"provider": "novita", | |
"api_key": "$HF_TOKEN", | |
"max_concurrent_requests": 32, | |
}, | |
{ | |
"model_name": "Qwen/Qwen2.5-72B-Instruct", | |
"provider": "novita", | |
"api_key": "$HF_TOKEN", | |
"max_concurrent_requests": 32, | |
}, | |
], | |
"model_roles": { | |
"ingestion": ["Qwen/Qwen2.5-VL-72B-Instruct"], | |
"summarization": ["Qwen/Qwen2.5-72B-Instruct"], | |
"chunking": ["intfloat/multilingual-e5-large-instruct"], | |
"single_shot_question_generation": ["Qwen/Qwen2.5-72B-Instruct"], | |
"multi_hop_question_generation": ["Qwen/Qwen2.5-72B-Instruct"], | |
}, | |
"pipeline": { | |
"ingestion": { | |
"source_documents_dir": f"uploaded_files/{self.session_uid}/uploaded_files/", | |
"output_dir": f"uploaded_files/{self.session_uid}/ingested", | |
"run": True, | |
}, | |
"upload_ingest_to_hub": { | |
"source_documents_dir": f"uploaded_files/{self.session_uid}/ingested", | |
"run": True, # Réactivé pour l'upload sur le Hub | |
}, | |
"summarization": { | |
"run": True, | |
}, | |
"chunking": { | |
"run": True, | |
"chunking_configuration": { | |
"l_min_tokens": 64, | |
"l_max_tokens": 128, | |
"tau_threshold": 0.8, | |
"h_min": 2, | |
"h_max": 5, | |
"num_multihops_factor": 2, | |
}, | |
}, | |
"single_shot_question_generation": { | |
"run": True, | |
"additional_instructions": "Generate questions to test a curious adult", | |
"chunk_sampling": { | |
"mode": "count", | |
"value": 5, | |
"random_seed": 123, | |
}, | |
}, | |
"multi_hop_question_generation": { | |
"run": True, | |
"additional_instructions": "Generate questions to test a curious adult", | |
"chunk_sampling": { | |
"mode": "percentage", | |
"value": 0.3, | |
"random_seed": 42, | |
}, | |
}, | |
"lighteval": { | |
"run": True, | |
}, | |
}, | |
} | |
def save_yaml_file(self, config: Dict[str, Any], path: str) -> str: | |
""" | |
Save the given configuration dictionary to a YAML file | |
Args: | |
config: Configuration dictionary | |
path: Path to save the file | |
Returns: | |
Path to the saved file | |
""" | |
try: | |
# Create directory if it doesn't exist | |
os.makedirs(os.path.dirname(path), exist_ok=True) | |
with open(path, "w") as file: | |
yaml.dump(config, file, default_flow_style=False, sort_keys=False) | |
self._add_log(f"[INFO] Configuration saved: {path}") | |
return path | |
except Exception as e: | |
error_msg = f"Error saving configuration: {str(e)}" | |
self._add_log(f"[ERROR] {error_msg}") | |
raise RuntimeError(error_msg) | |
def _run_task(self, file_path: str) -> str: | |
""" | |
Internal method to run the task in a separate thread | |
Args: | |
file_path: Path to the uploaded file | |
Returns: | |
Path to the configuration file | |
""" | |
try: | |
# Use the default yourbench organization | |
org_name = os.getenv("HF_ORGANIZATION") | |
# Check if HF token is available | |
hf_token = os.getenv("HF_TOKEN") | |
if not hf_token: | |
raise RuntimeError("HF_TOKEN environment variable is not defined") | |
self._add_log(f"[INFO] Organization: {org_name}") | |
time.sleep(0.5) # Simulate delay | |
# Save the uploaded file | |
saved_file_path = self.save_uploaded_file(file_path) | |
time.sleep(1) # Simulate delay | |
# Path for the config file | |
config_dir = pathlib.Path(f"uploaded_files/{self.session_uid}") | |
config_path = config_dir / "config.yml" | |
# Generate dataset name based on session ID | |
dataset_name = f"yourbench_{self.session_uid}" | |
self._add_log(f"[INFO] Dataset name: {dataset_name}") | |
time.sleep(0.8) # Simulate delay | |
# Generate and save the configuration | |
config = self.generate_base_config(org_name, dataset_name) | |
time.sleep(1.2) # Simulate delay | |
config_file_path = self.save_yaml_file(config, str(config_path)) | |
self._add_log(f"[INFO] Configuration generated successfully: {config_file_path}") | |
# Simulate additional processing | |
time.sleep(1.5) # Simulate delay | |
self._add_log("[INFO] Starting ingestion") | |
time.sleep(2) # Simulate delay | |
self._add_log(f"[INFO] Processing file: {dataset_name}") | |
time.sleep(2) # Simulate delay | |
self._add_log("[SUCCESS] Stage completed: config_generation") | |
# Tâche terminée | |
self.mark_task_completed() | |
return str(config_path) | |
except Exception as e: | |
error_msg = f"Error generating configuration: {str(e)}" | |
self._add_log(f"[ERROR] {error_msg}") | |
self.mark_task_completed() | |
raise RuntimeError(error_msg) | |
def run(self, file_path: str, token: Optional[str] = None) -> str: | |
""" | |
Run the task to create and save the configuration file asynchronously | |
Args: | |
file_path: Path to the uploaded file | |
token: Hugging Face token (not used, using HF_TOKEN from environment) | |
Returns: | |
Path to the configuration file | |
""" | |
# Mark the task as running | |
self.is_running_flag.set() | |
# Start the task in a separate thread | |
self.thread = threading.Thread(target=self._run_task, args=(file_path,)) | |
self.thread.daemon = True | |
self.thread.start() | |
# Return the expected config path | |
return f"uploaded_files/{self.session_uid}/config.yml" | |
def is_running(self) -> bool: | |
""" | |
Check if the task is running | |
Returns: | |
True if running, False otherwise | |
""" | |
return self.is_running_flag.is_set() and not self.is_completed | |
def is_task_completed(self) -> bool: | |
""" | |
Check if the task is completed | |
Returns: | |
True if completed, False otherwise | |
""" | |
return self.is_completed | |
def mark_task_completed(self) -> None: | |
""" | |
Mark the task as completed | |
""" | |
self.is_completed = True | |
self.is_running_flag.clear() | |
self._add_log("[INFO] Configuration generation task completed") |