Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
""" | |
Task to ingest and transform documents to markdown using yourbench | |
""" | |
import os | |
import time | |
import pathlib | |
import subprocess | |
import threading | |
from typing import Optional, List, Tuple, Dict, Any | |
import yaml | |
from loguru import logger | |
class CreateBenchTask: | |
""" | |
Task to ingest and transform documents to markdown using yourbench | |
""" | |
def __init__(self, session_uid: str, config_path: Optional[str] = None): | |
""" | |
Initialize the ingestion task | |
Args: | |
session_uid: Session ID for this task | |
config_path: Path to the configuration file, will be generated if None | |
""" | |
self.session_uid = session_uid | |
self.logs: List[str] = [] | |
self.is_completed = False | |
self.process = None | |
self.is_running_flag = threading.Event() | |
# Default config path if not provided | |
if config_path is None: | |
config_path = f"uploaded_files/{session_uid}/config.yml" | |
self.config_path = config_path | |
# Command to run yourbench - modified to avoid error with uv run | |
self.command = ["yourbench", "run", "--config", str(self.config_path)] | |
self._add_log("[INFO] Initializing ingestion task") | |
self._add_log(f"[INFO] Using configuration file: {self.config_path}") | |
def _add_log(self, message: str) -> None: | |
""" | |
Add a log message to the logs list | |
Args: | |
message: Log message to add | |
""" | |
if message not in self.logs: # Avoid duplicates | |
self.logs.append(message) | |
# Force copy of the list to avoid reference problems | |
self.logs = self.logs.copy() | |
# Log to system logs | |
logger.info(f"[{self.session_uid}] {message}") | |
def get_logs(self) -> List[str]: | |
""" | |
Get all logs for this task | |
Returns: | |
List of log messages | |
""" | |
return self.logs.copy() # Return a copy to avoid reference problems | |
def is_task_completed(self) -> bool: | |
""" | |
Check if the task is completed | |
Returns: | |
True if completed, False otherwise | |
""" | |
return self.is_completed | |
def is_running(self) -> bool: | |
""" | |
Check if the process is running | |
Returns: | |
True if running, False otherwise | |
""" | |
return self.is_running_flag.is_set() | |
def stop(self) -> None: | |
""" | |
Stop the process if it's running | |
""" | |
if self.process and self.is_running(): | |
self._add_log("[INFO] Stopping ingestion process") | |
try: | |
self.process.terminate() | |
# Wait 5 seconds for termination | |
self.process.wait(timeout=5) | |
except subprocess.TimeoutExpired: | |
self._add_log("[WARN] Process not responding, forcing termination") | |
self.process.kill() | |
finally: | |
self.is_running_flag.clear() | |
self.is_completed = True | |
self._add_log("[INFO] Ingestion process stopped") | |
def _capture_output(self) -> None: | |
""" | |
Capture and process the output from the yourbench process | |
""" | |
self._add_log("[INFO] Starting output capture") | |
try: | |
while self.is_running() and self.process: | |
line = self.process.stdout.readline() | |
if not line: | |
# If no line is read and the process is no longer running | |
if self.process.poll() is not None: | |
self.is_running_flag.clear() | |
break | |
# Otherwise, wait a bit and continue | |
time.sleep(0.1) | |
continue | |
# Process the output line | |
line = line.strip() | |
if line: | |
# Log raw output for debugging | |
self._add_log(f"[DEBUG] Raw output: {line}") | |
# Filter and format the line as needed | |
if "ERROR" in line: | |
self._add_log(f"[ERROR] {line}") | |
elif "WARNING" in line: | |
self._add_log(f"[WARN] {line}") | |
else: | |
# Detect completed stages | |
if "Completed stage:" in line: | |
# Extraire le nom de l'étape | |
stage = line.split("'")[1] if "'" in line else line.split("Completed stage:")[1].strip() | |
# Standardiser les noms d'étapes pour correspondre au frontend | |
stage = self._standardize_stage_name(stage) | |
self._add_log(f"[SUCCESS] Stage completed: {stage}") | |
else: | |
self._add_log(f"[INFO] {line}") | |
# Check exit code once the process is finished | |
if self.process: | |
exit_code = self.process.poll() | |
if exit_code == 0: | |
self._add_log("[SUCCESS] Benchmark process completed successfully") | |
else: | |
self._add_log(f"[ERROR] Benchmark process terminated with error code: {exit_code}") | |
except Exception as e: | |
self._add_log(f"[ERROR] Error during output capture: {str(e)}") | |
finally: | |
self.is_completed = True | |
self.is_running_flag.clear() | |
self._add_log("[INFO] Output capture completed") | |
def _standardize_stage_name(self, stage_name: str) -> str: | |
""" | |
Standardize the stage name to match the frontend expectations | |
Args: | |
stage_name: Original stage name | |
Returns: | |
Standardized stage name | |
""" | |
# Table de correspondance pour les noms d'étapes | |
stage_mapping = { | |
# Ajouter ici les correspondances nécessaires | |
# exemple: "original_name": "standardized_name" | |
"ingest": "ingestion", | |
"upload": "upload_ingest_to_hub", | |
"summarize": "summarization", | |
"chunk": "chunking", | |
"generate_questions": "single_shot_question_generation", | |
} | |
# Chercher des correspondances partielles | |
for key, value in stage_mapping.items(): | |
if key in stage_name.lower(): | |
return value | |
# Si aucune correspondance n'est trouvée, renvoyer le nom d'origine | |
return stage_name | |
def _simulate_ingestion_process(self) -> None: | |
""" | |
Simulate the ingestion process for development mode | |
""" | |
self._add_log("[INFO] Simulating ingestion process") | |
# Simuler les étapes avec les mêmes noms que ceux attendus par le frontend | |
steps = [ | |
("ingestion", 2), | |
("upload_ingest_to_hub", 3), | |
("summarization", 2), | |
("chunking", 3), | |
("single_shot_question_generation", 4) | |
] | |
for step, delay in steps: | |
# Ajouter un message de début d'étape | |
self._add_log(f"[INFO] Processing {step}...") | |
time.sleep(delay) # Simuler un délai | |
# Marquer l'étape comme terminée | |
self._add_log(f"[SUCCESS] Stage completed: {step}") | |
# Marquer la tâche comme terminée | |
self.is_completed = True | |
self._add_log("[SUCCESS] Benchmark process completed successfully") | |
def run(self, token: Optional[str] = None) -> None: | |
""" | |
Run the ingestion task | |
Args: | |
token: Hugging Face token | |
""" | |
try: | |
self._add_log("[INFO] Starting ingestion process") | |
# Check if the configuration file exists | |
if not os.path.exists(self.config_path): | |
raise FileNotFoundError(f"Configuration file does not exist: {self.config_path}") | |
# Examine the configuration to get information | |
try: | |
with open(self.config_path, 'r') as f: | |
config_yaml = yaml.safe_load(f) | |
# Get source and destination paths | |
source_dir = config_yaml.get("pipeline", {}).get("ingestion", {}).get("source_documents_dir", "") | |
output_dir = config_yaml.get("pipeline", {}).get("ingestion", {}).get("output_dir", "") | |
if source_dir: | |
self._add_log(f"[INFO] Source directory: {source_dir}") | |
if output_dir: | |
self._add_log(f"[INFO] Output directory: {output_dir}") | |
# List files to process if the directory exists | |
if source_dir and os.path.exists(source_dir): | |
files = os.listdir(source_dir) | |
if files: | |
self._add_log(f"[INFO] Files to process: {', '.join(files)}") | |
else: | |
self._add_log("[WARN] No files found in source directory") | |
except Exception as e: | |
self._add_log(f"[WARN] Unable to read configuration: {str(e)}") | |
# Environment preparation | |
env = os.environ.copy() | |
# Explicitly define environment variables for authentication | |
hf_token = os.getenv("HF_TOKEN") | |
if hf_token: | |
# Explicitly export these variables for yourbench | |
env["HF_TOKEN"] = hf_token | |
env["HUGGING_FACE_HUB_TOKEN"] = hf_token | |
env["HF_ORGANIZATION"] = os.getenv("HF_ORGANIZATION", "yourbench") | |
self._add_log("[INFO] Environment variables exported") | |
# In development mode, only simulate ingestion | |
if os.environ.get("DEVELOPMENT_MODE", "").lower() == "true": | |
self._add_log("[INFO] Development mode enabled, simulating ingestion") | |
self._simulate_ingestion_process() | |
return | |
# Start the process | |
self._add_log(f"[INFO] Executing command: {' '.join(self.command)}") | |
self.process = subprocess.Popen( | |
self.command, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
text=True, | |
bufsize=1, | |
universal_newlines=True, | |
env=env | |
) | |
# Mark the process as running | |
self.is_running_flag.set() | |
# Start a thread to capture output | |
output_thread = threading.Thread(target=self._capture_output) | |
output_thread.daemon = True | |
output_thread.start() | |
self._add_log(f"[INFO] Process started with PID: {self.process.pid}") | |
except Exception as e: | |
self._add_log(f"[ERROR] Error starting ingestion process: {str(e)}") | |
self.is_completed = True | |