Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
""" | |
Task to ingest and transform documents to markdown using yourbench | |
""" | |
import os | |
import time | |
import pathlib | |
import subprocess | |
import threading | |
from typing import Optional, List, Tuple, Dict, Any | |
import yaml | |
from loguru import logger | |
class CreateBenchTask: | |
""" | |
Task to ingest and transform documents to markdown using yourbench | |
""" | |
def __init__(self, session_uid: str, config_path: Optional[str] = None): | |
""" | |
Initialize the ingestion task | |
Args: | |
session_uid: Session ID for this task | |
config_path: Path to the configuration file, will be generated if None | |
""" | |
self.session_uid = session_uid | |
self.logs: List[str] = [] | |
self.is_completed = False | |
self.process = None | |
self.is_running_flag = threading.Event() | |
# Default config path if not provided | |
if config_path is None: | |
config_path = f"uploaded_files/{session_uid}/config.yml" | |
self.config_path = config_path | |
# Command to run yourbench - modified to avoid error with uv run | |
self.command = ["yourbench", "run", "--config", str(self.config_path)] | |
self._add_log("[INFO] Initializing ingestion task") | |
self._add_log(f"[INFO] Using configuration file: {self.config_path}") | |
def _add_log(self, message: str) -> None: | |
""" | |
Add a log message to the logs list | |
Args: | |
message: Log message to add | |
""" | |
if message not in self.logs: # Avoid duplicates | |
self.logs.append(message) | |
# Force copy of the list to avoid reference problems | |
self.logs = self.logs.copy() | |
# Log to system logs | |
logger.info(f"[{self.session_uid}] {message}") | |
def get_logs(self) -> List[str]: | |
""" | |
Get all logs for this task | |
Returns: | |
List of log messages | |
""" | |
return self.logs.copy() # Return a copy to avoid reference problems | |
def is_task_completed(self) -> bool: | |
""" | |
Check if the task is completed | |
Returns: | |
True if completed, False otherwise | |
""" | |
return self.is_completed | |
def is_running(self) -> bool: | |
""" | |
Check if the process is running | |
Returns: | |
True if running, False otherwise | |
""" | |
return self.is_running_flag.is_set() | |
def stop(self) -> None: | |
""" | |
Stop the process if it's running | |
""" | |
if self.process and self.is_running(): | |
self._add_log("[INFO] Stopping ingestion process") | |
try: | |
self.process.terminate() | |
# Wait 5 seconds for termination | |
self.process.wait(timeout=5) | |
except subprocess.TimeoutExpired: | |
self._add_log("[WARN] Process not responding, forcing termination") | |
self.process.kill() | |
finally: | |
self.is_running_flag.clear() | |
self.is_completed = True | |
self._add_log("[INFO] Ingestion process stopped") | |
def _capture_output(self) -> None: | |
""" | |
Capture and process the output from the yourbench process | |
""" | |
self._add_log("[INFO] Starting output capture") | |
try: | |
while self.is_running() and self.process: | |
line = self.process.stdout.readline() | |
if not line: | |
# If no line is read and the process is no longer running | |
if self.process.poll() is not None: | |
self.is_running_flag.clear() | |
break | |
# Otherwise, wait a bit and continue | |
time.sleep(0.1) | |
continue | |
# Process the output line | |
line = line.strip() | |
if line: | |
# Log raw output for debugging | |
self._add_log(f"[DEBUG] Raw output: {line}") | |
# Filter and format the line as needed | |
if "ERROR" in line: | |
self._add_log(f"[ERROR] {line}") | |
elif "WARNING" in line: | |
self._add_log(f"[WARN] {line}") | |
else: | |
# Detect completed stages | |
if "Completed stage:" in line: | |
stage = line.split("'")[1] if "'" in line else line | |
self._add_log(f"[SUCCESS] Stage completed: {stage}") | |
else: | |
self._add_log(f"[INFO] {line}") | |
# Check exit code once the process is finished | |
if self.process: | |
exit_code = self.process.poll() | |
if exit_code == 0: | |
self._add_log("[SUCCESS] Ingestion process completed successfully") | |
else: | |
self._add_log(f"[ERROR] Ingestion process terminated with error code: {exit_code}") | |
except Exception as e: | |
self._add_log(f"[ERROR] Error during output capture: {str(e)}") | |
finally: | |
self.is_completed = True | |
self.is_running_flag.clear() | |
self._add_log("[INFO] Output capture completed") | |
def run(self, token: Optional[str] = None) -> None: | |
""" | |
Run the ingestion task | |
Args: | |
token: Hugging Face token | |
""" | |
try: | |
self._add_log("[INFO] Starting ingestion process") | |
# Check if the configuration file exists | |
if not os.path.exists(self.config_path): | |
raise FileNotFoundError(f"Configuration file does not exist: {self.config_path}") | |
# Examine the configuration to get information | |
try: | |
with open(self.config_path, 'r') as f: | |
config_yaml = yaml.safe_load(f) | |
# Get source and destination paths | |
source_dir = config_yaml.get("pipeline", {}).get("ingestion", {}).get("source_documents_dir", "") | |
output_dir = config_yaml.get("pipeline", {}).get("ingestion", {}).get("output_dir", "") | |
if source_dir: | |
self._add_log(f"[INFO] Source directory: {source_dir}") | |
if output_dir: | |
self._add_log(f"[INFO] Output directory: {output_dir}") | |
# List files to process if the directory exists | |
if source_dir and os.path.exists(source_dir): | |
files = os.listdir(source_dir) | |
if files: | |
self._add_log(f"[INFO] Files to process: {', '.join(files)}") | |
else: | |
self._add_log("[WARN] No files found in source directory") | |
except Exception as e: | |
self._add_log(f"[WARN] Unable to read configuration: {str(e)}") | |
# Environment preparation | |
env = os.environ.copy() | |
# Explicitly define environment variables for authentication | |
hf_token = os.getenv("HF_TOKEN") | |
if hf_token: | |
# Explicitly export these variables for yourbench | |
env["HF_TOKEN"] = hf_token | |
env["HUGGING_FACE_HUB_TOKEN"] = hf_token | |
env["HF_ORGANIZATION"] = os.getenv("HF_ORGANIZATION", "yourbench") | |
self._add_log("[INFO] Environment variables HF_TOKEN, HUGGING_FACE_HUB_TOKEN and HF_ORGANIZATION exported") | |
# In development mode, only simulate ingestion | |
if os.environ.get("DEVELOPMENT_MODE", "").lower() == "true": | |
self._add_log("[INFO] Development mode enabled, simulating ingestion") | |
self._simulate_ingestion_process() | |
return | |
# Start the process | |
self._add_log(f"[INFO] Executing command: {' '.join(self.command)}") | |
self.process = subprocess.Popen( | |
self.command, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
text=True, | |
bufsize=1, | |
universal_newlines=True, | |
env=env | |
) | |
# Mark the process as running | |
self.is_running_flag.set() | |
# Start a thread to capture output | |
output_thread = threading.Thread(target=self._capture_output) | |
output_thread.daemon = True | |
output_thread.start() | |
self._add_log(f"[INFO] Process started with PID: {self.process.pid}") | |
except Exception as e: | |
self._add_log(f"[ERROR] Error starting ingestion process: {str(e)}") | |
self.is_completed = True | |
def _simulate_ingestion_process(self) -> None: | |
""" | |
Simulate the ingestion process for testing/development | |
This will be removed in production | |
""" | |
# This method is just to simulate logs during development | |
# It will be removed in production | |
threading.Thread(target=self._simulate_logs).start() | |
def _simulate_logs(self) -> None: | |
""" | |
Simulate logs for testing/development | |
This will be used when yourbench isn't installed or in development mode | |
""" | |
# Log simulation (used when yourbench is not available) | |
self._add_log("[INFO] Simulation mode enabled (yourbench is not actually running)") | |
# Get filenames from source directory | |
source_files = [] | |
try: | |
with open(self.config_path, 'r') as f: | |
config_yaml = yaml.safe_load(f) | |
source_dir = config_yaml.get("pipeline", {}).get("ingestion", {}).get("source_documents_dir", "") | |
if source_dir and os.path.exists(source_dir): | |
source_files = [f for f in os.listdir(source_dir) | |
if os.path.isfile(os.path.join(source_dir, f))] | |
except Exception: | |
source_files = ["document.pdf", "document.txt"] # Fallback | |
# Create output directory if it doesn't exist | |
output_dir = "" | |
try: | |
output_dir = config_yaml.get("pipeline", {}).get("ingestion", {}).get("output_dir", "") | |
if output_dir: | |
os.makedirs(output_dir, exist_ok=True) | |
except Exception: | |
pass | |
# Simulate file processing | |
time.sleep(1) | |
self._add_log("[INFO] Initializing document ingestion") | |
time.sleep(1.5) | |
self._add_log("[INFO] Loading configuration parameters") | |
time.sleep(1) | |
self._add_log("[INFO] Verifying source files") | |
# Process each file | |
for file in source_files: | |
time.sleep(1.5) | |
self._add_log(f"[INFO] Processing file: {file}") | |
time.sleep(2) | |
self._add_log(f"[INFO] Extracting content from {file}") | |
time.sleep(1.5) | |
self._add_log(f"[INFO] Converting to markdown: {file}") | |
# Create a simulated markdown file if an output directory is defined | |
if output_dir: | |
base_name = os.path.splitext(file)[0] | |
output_file = os.path.join(output_dir, f"{base_name}.md") | |
try: | |
with open(output_file, 'w') as f: | |
f.write(f"# {base_name}\n\n") | |
f.write("This is a markdown document automatically generated by the simulation.\n\n") | |
f.write("## Section 1\n\n") | |
f.write("Content of section 1...\n\n") | |
f.write("## Section 2\n\n") | |
f.write("Content of section 2...\n\n") | |
self._add_log(f"[INFO] Markdown file created: {output_file}") | |
except Exception as e: | |
self._add_log(f"[ERROR] Error creating markdown file: {str(e)}") | |
time.sleep(2) | |
self._add_log("[INFO] Finalizing processing") | |
time.sleep(1) | |
self._add_log("[SUCCESS] Stage completed: ingestion") | |
time.sleep(0.5) | |
self._add_log("[SUCCESS] Ingestion completed successfully") | |
# Mark task as completed | |
self.is_completed = True |