qLeaderboard-aBase4Community / evaluation_queue.py
Quazim0t0's picture
Upload 16 files
9a46619 verified
raw
history blame
42.4 kB
"""
Model evaluation queue system for Dynamic Highscores.
This module handles the evaluation queue, CPU-only processing,
and enforces daily submission limits for users.
"""
import os
import json
import time
import threading
import queue as queue_module
from datetime import datetime, timedelta
import gradio as gr
from huggingface_hub import HfApi, hf_hub_download, snapshot_download
from datasets import load_dataset
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
import sqlite3
class EvaluationQueue:
"""Manages the evaluation queue for model benchmarking."""
def __init__(self, db_manager, auth_manager):
"""Initialize the evaluation queue manager.
Args:
db_manager: Database manager instance
auth_manager: Authentication manager instance
"""
self.db_manager = db_manager
self.auth_manager = auth_manager
self.hf_api = HfApi()
self.queue = queue_module.Queue()
self.is_processing = False
self.worker_thread = None
self.model_tags = ["Merge", "Agent", "Reasoning", "Coding", "General", "Specialized", "Instruction", "Chat"]
self.current_evaluation = None
self.progress = 0
self.progress_lock = threading.Lock()
# Memory limit for models in GB (leave 2GB for system)
self.memory_limit_gb = 14.0
def start_worker(self):
"""Start the worker thread for processing the evaluation queue."""
if self.worker_thread is None or not self.worker_thread.is_alive():
self.is_processing = True
self.worker_thread = threading.Thread(target=self._process_queue)
self.worker_thread.daemon = True
self.worker_thread.start()
def stop_worker(self):
"""Stop the worker thread."""
self.is_processing = False
if self.worker_thread and self.worker_thread.is_alive():
self.worker_thread.join(timeout=1.0)
def check_model_size(self, model_id):
"""Check if a model will fit within RAM limitations.
Args:
model_id: HuggingFace model ID
Returns:
tuple: (will_fit, message)
"""
try:
# Query model info from the HuggingFace API
model_info_obj = self.hf_api.model_info(model_id)
# Initialize total size
total_size_gb = 0
# Try different approaches to get model size based on API response structure
if hasattr(model_info_obj, 'safetensors') and model_info_obj.safetensors:
# New API format with safetensors dict
for file_info in model_info_obj.safetensors.values():
if hasattr(file_info, 'size'):
total_size_gb += file_info.size / (1024 * 1024 * 1024)
elif isinstance(file_info, dict) and 'size' in file_info:
total_size_gb += file_info['size'] / (1024 * 1024 * 1024)
# Fallback to siblings method
if total_size_gb == 0 and hasattr(model_info_obj, 'siblings'):
for sibling in model_info_obj.siblings:
if hasattr(sibling, 'size'):
if sibling.rfilename.endswith(('.bin', '.safetensors', '.pt')):
total_size_gb += sibling.size / (1024 * 1024 * 1024)
elif isinstance(sibling, dict) and 'size' in sibling:
if sibling.get('rfilename', '').endswith(('.bin', '.safetensors', '.pt')):
total_size_gb += sibling['size'] / (1024 * 1024 * 1024)
# If we still couldn't determine size, try a reasonable guess based on model name
if total_size_gb == 0:
# Try to guess from model name (e.g., if it has "7b" in the name)
model_name = model_id.lower()
size_indicators = {
"1b": 1, "2b": 2, "3b": 3, "5b": 5, "7b": 7, "8b": 8,
"10b": 10, "13b": 13, "20b": 20, "30b": 30, "65b": 65, "70b": 70
}
for indicator, size in size_indicators.items():
if indicator in model_name.replace("-", "").replace("_", ""):
total_size_gb = size * 2 # Rough estimate: param count × 2 for size in GB
break
# If we still couldn't determine size, use a default
if total_size_gb == 0:
# Try direct API method
try:
print(f"Checking model size with direct method for {model_id}")
# Print out the entire structure for debugging
print(f"Model info: {model_info_obj.__dict__}")
# Default to a conservative estimate
total_size_gb = 5 # Assume a 5GB model as default
except Exception as e:
print(f"Direct size check failed: {e}")
return True, "Unable to determine model size accurately, but allowing submission with caution"
# Account for memory overhead
estimated_ram_needed = total_size_gb * 1.3 # 30% overhead
# Check against limit
if estimated_ram_needed > self.memory_limit_gb:
return False, f"Model is too large (approximately {total_size_gb:.1f}GB, needs {estimated_ram_needed:.1f}GB RAM). Maximum allowed is {self.memory_limit_gb}GB."
return True, f"Model size check passed ({total_size_gb:.1f}GB, estimated {estimated_ram_needed:.1f}GB RAM usage)"
except Exception as e:
print(f"Model size check error: {e}")
# Log more details for debugging
import traceback
traceback.print_exc()
# Allow submission with warning
return True, f"Warning: Could not verify model size ({str(e)}). Please ensure your model is under {self.memory_limit_gb}GB."
def _process_queue(self):
"""Process the evaluation queue in a separate thread."""
while self.is_processing:
try:
# Get the next evaluation from the database
pending_evals = self.db_manager.get_evaluation_results(status="pending")
if pending_evals:
# Sort by priority and added_at
next_eval = pending_evals[0]
# Update status to running
self.db_manager.update_evaluation_status(next_eval['id'], 'running')
# Set current evaluation and reset progress
with self.progress_lock:
self.current_evaluation = next_eval
self.progress = 0
try:
# Get model and benchmark details
model_info = self.db_manager.get_model(next_eval['model_id'])
benchmark_info = self.db_manager.get_benchmark(next_eval['benchmark_id'])
if model_info and benchmark_info:
# Check if model will fit in memory
will_fit, message = self.check_model_size(model_info['hf_model_id'])
if not will_fit:
raise Exception(f"Model too large for evaluation: {message}")
# Run the evaluation
results = self._run_evaluation(
model_info['hf_model_id'],
benchmark_info['dataset_id']
)
# Calculate overall score
score = self._calculate_overall_score(results)
# Update status to completed with results
self.db_manager.update_evaluation_status(
next_eval['id'],
'completed',
results=results,
score=score
)
else:
raise Exception("Model or benchmark not found")
except Exception as e:
print(f"Evaluation error: {e}")
# Update status to failed with error message
error_results = {"error": str(e)}
self.db_manager.update_evaluation_status(
next_eval['id'],
'failed',
results=error_results
)
# Clear current evaluation
with self.progress_lock:
self.current_evaluation = None
self.progress = 0
else:
# No evaluations in queue, sleep for a bit
time.sleep(5)
except Exception as e:
print(f"Queue processing error: {e}")
time.sleep(5)
def _run_evaluation(self, model_id, dataset_id):
"""Run an evaluation for a model on a benchmark.
Args:
model_id: HuggingFace model ID
dataset_id: HuggingFace dataset ID (with optional config)
Returns:
dict: Evaluation results
"""
# Update progress
with self.progress_lock:
self.progress = 5 # Starting evaluation
# Parse dataset ID and config
if ":" in dataset_id:
dataset_id, config = dataset_id.split(":", 1)
else:
config = None
# Update progress
with self.progress_lock:
self.progress = 10 # Loading dataset
# Load the dataset
try:
if config:
dataset = load_dataset(dataset_id, config, split="test")
else:
dataset = load_dataset(dataset_id, split="test")
except Exception as e:
return {"error": f"Failed to load dataset: {str(e)}"}
# Update progress
with self.progress_lock:
self.progress = 20 # Loading model
try:
# Load the model with memory optimization settings
device = "cpu"
model = AutoModelForCausalLM.from_pretrained(
model_id,
device_map=device,
torch_dtype=torch.float32, # Use float32 for CPU
low_cpu_mem_usage=True, # Enable memory optimization
offload_folder="offload", # Enable offloading if needed
offload_state_dict=True, # Offload state dict for memory saving
max_memory={0: f"{self.memory_limit_gb}GB"} # Limit memory usage
)
tokenizer = AutoTokenizer.from_pretrained(model_id)
except Exception as e:
print(f"Model loading error: {e}")
return {"error": f"Failed to load model: {str(e)}"}
# Update progress
with self.progress_lock:
self.progress = 30 # Determining task type
# Determine task type based on dataset features
task_type = self._determine_task_type(dataset)
# Update progress
with self.progress_lock:
self.progress = 40 # Starting evaluation
try:
# Run appropriate evaluation based on task type
if task_type == "text-generation":
results = self._evaluate_text_generation(model, tokenizer, dataset)
elif task_type == "question-answering":
results = self._evaluate_question_answering(model, tokenizer, dataset)
elif task_type == "classification":
results = self._evaluate_classification(model, tokenizer, dataset)
elif task_type == "code-generation":
results = self._evaluate_code_generation(model, tokenizer, dataset)
else:
# Default to general evaluation
results = self._evaluate_general(model, tokenizer, dataset)
except Exception as e:
print(f"Evaluation task error: {e}")
return {"error": f"Evaluation failed: {str(e)}"}
# Update progress
with self.progress_lock:
self.progress = 95 # Cleaning up
# Clean up to free memory
del model
del tokenizer
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Update progress
with self.progress_lock:
self.progress = 100 # Completed
return results
def get_current_progress(self):
"""Get the current evaluation progress.
Returns:
tuple: (current_evaluation, progress_percentage)
"""
with self.progress_lock:
return self.current_evaluation, self.progress
def _determine_task_type(self, dataset):
"""Determine the task type based on dataset features.
Args:
dataset: HuggingFace dataset
Returns:
str: Task type
"""
features = dataset.features
# Check for common feature patterns
if "question" in features and "answer" in features:
return "question-answering"
elif "code" in features or "solution" in features:
return "code-generation"
elif "label" in features or "class" in features:
return "classification"
elif "input" in features and "output" in features:
return "text-generation"
else:
return "general"
def _evaluate_text_generation(self, model, tokenizer, dataset):
"""Evaluate a model on text generation tasks.
Args:
model: HuggingFace model
tokenizer: HuggingFace tokenizer
dataset: HuggingFace dataset
Returns:
dict: Evaluation results
"""
# Set up generation pipeline
generator = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
device="cpu"
)
# Sample a subset for evaluation (to keep runtime reasonable)
if len(dataset) > 100:
dataset = dataset.select(range(100))
# Track metrics
correct = 0
total = 0
generated_texts = []
# Process each example
for i, example in enumerate(dataset):
# Update progress based on completion percentage
with self.progress_lock:
self.progress = 40 + int((i / len(dataset)) * 50)
input_text = example.get("input", example.get("prompt", ""))
expected_output = example.get("output", example.get("target", ""))
if not input_text or not expected_output:
continue
# Generate text
generated = generator(
input_text,
max_length=100,
num_return_sequences=1
)
generated_text = generated[0]["generated_text"]
generated_texts.append(generated_text)
# Simple exact match check
if expected_output.strip() in generated_text:
correct += 1
total += 1
# Calculate metrics
accuracy = correct / total if total > 0 else 0
return {
"accuracy": accuracy,
"samples_evaluated": total,
"generated_samples": generated_texts[:5] # Include a few samples
}
def _evaluate_question_answering(self, model, tokenizer, dataset):
"""Evaluate a model on question answering tasks.
Args:
model: HuggingFace model
tokenizer: HuggingFace tokenizer
dataset: HuggingFace dataset
Returns:
dict: Evaluation results
"""
# Set up QA pipeline
qa_pipeline = pipeline(
"question-answering",
model=model,
tokenizer=tokenizer,
device="cpu"
)
# Sample a subset for evaluation
if len(dataset) > 100:
dataset = dataset.select(range(100))
# Track metrics
exact_matches = 0
f1_scores = []
total = 0
# Process each example
for i, example in enumerate(dataset):
# Update progress based on completion percentage
with self.progress_lock:
self.progress = 40 + int((i / len(dataset)) * 50)
question = example.get("question", "")
context = example.get("context", "")
answer = example.get("answer", "")
if not question or not answer:
continue
# Get model prediction
if context:
result = qa_pipeline(question=question, context=context)
else:
# If no context provided, use the question as context
result = qa_pipeline(question=question, context=question)
predicted_answer = result["answer"]
# Calculate exact match
if predicted_answer.strip() == answer.strip():
exact_matches += 1
# Calculate F1 score
f1 = self._calculate_f1(answer, predicted_answer)
f1_scores.append(f1)
total += 1
# Calculate metrics
exact_match_accuracy = exact_matches / total if total > 0 else 0
avg_f1 = sum(f1_scores) / len(f1_scores) if f1_scores else 0
return {
"exact_match": exact_match_accuracy,
"f1": avg_f1,
"samples_evaluated": total
}
def _evaluate_classification(self, model, tokenizer, dataset):
"""Evaluate a model on classification tasks.
Args:
model: HuggingFace model
tokenizer: HuggingFace tokenizer
dataset: HuggingFace dataset
Returns:
dict: Evaluation results
"""
# Set up classification pipeline
classifier = pipeline(
"text-classification",
model=model,
tokenizer=tokenizer,
device="cpu"
)
# Sample a subset for evaluation
if len(dataset) > 100:
dataset = dataset.select(range(100))
# Track metrics
correct = 0
total = 0
# Process each example
for i, example in enumerate(dataset):
# Update progress based on completion percentage
with self.progress_lock:
self.progress = 40 + int((i / len(dataset)) * 50)
text = example.get("text", example.get("sentence", ""))
label = str(example.get("label", example.get("class", "")))
if not text or not label:
continue
# Get model prediction
result = classifier(text)
predicted_label = result[0]["label"]
# Check if correct
if str(predicted_label) == label:
correct += 1
total += 1
# Calculate metrics
accuracy = correct / total if total > 0 else 0
return {
"accuracy": accuracy,
"samples_evaluated": total
}
def _evaluate_code_generation(self, model, tokenizer, dataset):
"""Evaluate a model on code generation tasks.
Args:
model: HuggingFace model
tokenizer: HuggingFace tokenizer
dataset: HuggingFace dataset
Returns:
dict: Evaluation results
"""
# Set up generation pipeline
generator = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
device="cpu"
)
# Sample a subset for evaluation
if len(dataset) > 50: # Smaller sample for code tasks
dataset = dataset.select(range(50))
# Track metrics
exact_matches = 0
functional_matches = 0
total = 0
# Process each example
for i, example in enumerate(dataset):
# Update progress based on completion percentage
with self.progress_lock:
self.progress = 40 + int((i / len(dataset)) * 50)
prompt = example.get("prompt", example.get("input", ""))
solution = example.get("solution", example.get("output", ""))
if not prompt or not solution:
continue
# Generate code
generated = generator(
prompt,
max_length=200,
num_return_sequences=1
)
generated_code = generated[0]["generated_text"]
# Extract code from generated text (remove prompt)
if prompt in generated_code:
generated_code = generated_code[len(prompt):].strip()
# Check exact match
if generated_code.strip() == solution.strip():
exact_matches += 1
functional_matches += 1
else:
# We would ideally check functional correctness here
# but that requires executing code which is complex and potentially unsafe
# For now, we'll use a simple heuristic
if len(generated_code) > 0 and any(keyword in generated_code for keyword in ["def ", "function", "return", "class"]):
functional_matches += 0.5 # Partial credit
total += 1
# Calculate metrics
exact_match_rate = exact_matches / total if total > 0 else 0
functional_correctness = functional_matches / total if total > 0 else 0
return {
"exact_match": exact_match_rate,
"functional_correctness": functional_correctness,
"samples_evaluated": total
}
def _evaluate_general(self, model, tokenizer, dataset):
"""General evaluation for any dataset type.
Args:
model: HuggingFace model
tokenizer: HuggingFace tokenizer
dataset: HuggingFace dataset
Returns:
dict: Evaluation results
"""
# Set up generation pipeline
generator = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
device="cpu"
)
# Sample a subset for evaluation
if len(dataset) > 50:
dataset = dataset.select(range(50))
# Find input and output fields
features = dataset.features
input_field = None
output_field = None
for field in features:
if field.lower() in ["input", "prompt", "question", "text"]:
input_field = field
elif field.lower() in ["output", "target", "answer", "response"]:
output_field = field
if not input_field:
# Just use the first string field as input
for field in features:
if isinstance(features[field], (str, list)):
input_field = field
break
# Track metrics
total = 0
generated_texts = []
# Process each example
for i, example in enumerate(dataset):
# Update progress based on completion percentage
with self.progress_lock:
self.progress = 40 + int((i / len(dataset)) * 50)
if input_field and input_field in example:
input_text = str(example[input_field])
# Generate text
generated = generator(
input_text,
max_length=100,
num_return_sequences=1
)
generated_text = generated[0]["generated_text"]
generated_texts.append({
"input": input_text,
"output": generated_text,
"expected": str(example[output_field]) if output_field and output_field in example else "N/A"
})
total += 1
return {
"samples_evaluated": total,
"generated_samples": generated_texts[:5] # Include a few samples
}
def _calculate_f1(self, answer, prediction):
"""Calculate F1 score between answer and prediction.
Args:
answer: Ground truth answer
prediction: Model prediction
Returns:
float: F1 score
"""
# Tokenize
answer_tokens = answer.lower().split()
prediction_tokens = prediction.lower().split()
# Calculate precision and recall
common_tokens = set(answer_tokens) & set(prediction_tokens)
if not common_tokens:
return 0.0
precision = len(common_tokens) / len(prediction_tokens)
recall = len(common_tokens) / len(answer_tokens)
# Calculate F1
if precision + recall == 0:
return 0.0
f1 = 2 * precision * recall / (precision + recall)
return f1
def _calculate_overall_score(self, results):
"""Calculate an overall score from evaluation results.
Args:
results: Evaluation results dictionary
Returns:
float: Overall score between 0 and 100
"""
# If there was an error, return a low score
if "error" in results:
return 0.0
score = 0.0
# Check for common metrics and weight them
if "accuracy" in results:
score += results["accuracy"] * 100
if "exact_match" in results:
score += results["exact_match"] * 100
if "f1" in results:
score += results["f1"] * 100
if "functional_correctness" in results:
score += results["functional_correctness"] * 100
# If multiple metrics were found, average them
num_metrics = sum(1 for metric in ["accuracy", "exact_match", "f1", "functional_correctness"] if metric in results)
if num_metrics > 0:
score /= num_metrics
else:
# Default score if no metrics available
score = 50.0
return score
def submit_evaluation(self, model_id, benchmark_id, user_id, priority=0):
"""Submit a model for evaluation on a benchmark.
Args:
model_id: Model ID in the database
benchmark_id: Benchmark ID in the database
user_id: User ID submitting the evaluation
priority: Queue priority (higher = higher priority)
Returns:
tuple: (evaluation_id, message)
"""
# Check if user can submit today
if not self.auth_manager.can_submit_benchmark(user_id):
return None, "Daily submission limit reached. Try again tomorrow."
try:
# Get model HuggingFace ID to check size
model_info = self.db_manager.get_model(model_id)
if not model_info:
return None, "Model not found in database."
# Check if model will fit in memory
will_fit, message = self.check_model_size(model_info['hf_model_id'])
if not will_fit:
return None, message
# Add evaluation to database and queue
evaluation_id = self.db_manager.add_evaluation(
model_id=model_id,
benchmark_id=benchmark_id,
priority=priority
)
# Update user's last submission date
self.auth_manager.update_submission_date(user_id)
# Make sure worker is running
self.start_worker()
return evaluation_id, f"Evaluation submitted successfully. {message}"
except Exception as e:
print(f"Submit evaluation error: {e}")
return None, f"Failed to submit evaluation: {str(e)}"
def get_queue_status(self):
"""Get the current status of the evaluation queue.
Returns:
dict: Queue status information
"""
try:
# Get evaluations from database
pending_evals = self.db_manager.get_evaluation_results(status="pending")
running_evals = self.db_manager.get_evaluation_results(status="running")
completed_evals = self.db_manager.get_evaluation_results(status="completed")
failed_evals = self.db_manager.get_evaluation_results(status="failed")
# Get current evaluation progress
current_eval, progress = self.get_current_progress()
return {
"pending": len(pending_evals),
"running": len(running_evals),
"completed": len(completed_evals),
"failed": len(failed_evals),
"is_processing": self.is_processing,
"current_evaluation": current_eval,
"progress": progress,
"memory_limit_gb": self.memory_limit_gb
}
except Exception as e:
print(f"Queue status error: {e}")
return {
"pending": 0,
"running": 0,
"completed": 0,
"failed": 0,
"is_processing": self.is_processing,
"current_evaluation": None,
"progress": 0,
"memory_limit_gb": self.memory_limit_gb,
"error": str(e)
}
# Model submission UI components
def create_model_submission_ui(evaluation_queue, auth_manager, db_manager):
"""Create the model submission UI components.
Args:
evaluation_queue: Evaluation queue instance
auth_manager: Authentication manager instance
db_manager: Database manager instance
Returns:
gr.Blocks: Gradio Blocks component with model submission UI
"""
with gr.Blocks() as submission_ui:
# Store user authentication state
user_state = gr.State(None)
# Check authentication on load
def check_auth_on_load(request: gr.Request):
if request:
# Special handling for HF Spaces OAuth
if 'SPACE_ID' in os.environ:
username = request.headers.get("HF-User")
if username:
user = db_manager.get_user_by_username(username)
if user:
print(f"User authenticated via HF Spaces OAuth: {username}")
return user
else:
# Standard token-based auth
user = auth_manager.check_login(request)
if user:
return user
return None
with gr.Tab("Submit Model"):
gr.Markdown(f"""
### Model Size Restrictions
Models must fit within {evaluation_queue.memory_limit_gb}GB of RAM for evaluation.
Large models will be rejected to ensure all evaluations can complete successfully.
""", elem_classes=["info-text"])
with gr.Row():
with gr.Column(scale=2):
model_id_input = gr.Textbox(
placeholder="HuggingFace model ID (e.g., 'gpt2', 'facebook/opt-350m')",
label="Model ID"
)
check_size_button = gr.Button("Check Model Size")
size_check_result = gr.Markdown("")
model_name_input = gr.Textbox(
placeholder="Display name for your model",
label="Model Name"
)
model_description_input = gr.Textbox(
placeholder="Brief description of your model",
label="Description",
lines=3
)
model_parameters_input = gr.Number(
label="Number of Parameters (billions)",
precision=2
)
with gr.Column(scale=1):
model_tag_input = gr.Dropdown(
choices=evaluation_queue.model_tags,
label="Model Tag",
info="Select one category that best describes your model"
)
# Fixed benchmark dropdown to properly show names
benchmark_dropdown = gr.Dropdown(
label="Benchmark",
info="Select a benchmark to evaluate your model on",
choices=[("none", "Loading benchmarks...")],
value=None
)
refresh_benchmarks_button = gr.Button("Refresh Benchmarks")
submit_model_button = gr.Button("Submit for Evaluation")
submission_status = gr.Markdown("")
auth_message = gr.Markdown("")
with gr.Tab("Evaluation Queue"):
refresh_queue_button = gr.Button("Refresh Queue")
with gr.Row():
with gr.Column(scale=1):
queue_stats = gr.JSON(
label="Queue Statistics"
)
with gr.Column(scale=2):
queue_status = gr.Dataframe(
headers=["ID", "Model", "Benchmark", "Status", "Submitted"],
label="Recent Evaluations"
)
with gr.Row(visible=True) as progress_container:
with gr.Column():
current_eval_info = gr.Markdown("No evaluation currently running")
# Use a simple text display for progress instead of Progress component
progress_display = gr.Markdown("Progress: 0%")
# Event handlers
def check_model_size_handler(model_id):
if not model_id:
return "Please enter a HuggingFace model ID."
try:
will_fit, message = evaluation_queue.check_model_size(model_id)
if will_fit:
return f"✅ {message}"
else:
return f"❌ {message}"
except Exception as e:
print(f"Model size check error: {e}")
import traceback
traceback.print_exc()
return f"Error checking model size: {str(e)}"
def refresh_benchmarks_handler():
benchmarks = db_manager.get_benchmarks()
# Format for dropdown - properly formatted to display names
choices = []
for b in benchmarks:
# Add as tuple of (id, name) to ensure proper display
choices.append((str(b["id"]), b["name"]))
if not choices:
choices = [("none", "No benchmarks available - add some first")]
return gr.update(choices=choices)
def submit_model_handler(model_id, model_name, model_description, model_parameters, model_tag, benchmark_id, user):
# Check if user is logged in
if not user:
return "Please log in to submit a model."
if not model_id or not model_name or not model_tag or not benchmark_id:
return "Please fill in all required fields."
if benchmark_id == "none":
return "Please select a valid benchmark."
try:
# Check if model will fit in RAM
will_fit, size_message = evaluation_queue.check_model_size(model_id)
if not will_fit:
return f"❌ {size_message}"
# Add model to database
model_db_id = db_manager.add_model(
name=model_name,
hf_model_id=model_id,
user_id=user["id"],
tag=model_tag,
parameters=str(model_parameters) if model_parameters else None,
description=model_description
)
if not model_db_id:
return "Failed to add model to database."
# Submit for evaluation
eval_id, message = evaluation_queue.submit_evaluation(
model_id=model_db_id,
benchmark_id=benchmark_id,
user_id=user["id"]
)
if eval_id:
return f"✅ Model submitted successfully. {size_message}\nEvaluation ID: {eval_id}"
else:
return message
except Exception as e:
print(f"Error submitting model: {str(e)}")
import traceback
traceback.print_exc()
return f"Error submitting model: {str(e)}"
def refresh_queue_handler():
# Get queue statistics
stats = evaluation_queue.get_queue_status()
# Get recent evaluations (all statuses, limited to 20)
evals = db_manager.get_evaluation_results(limit=20)
# Format for dataframe
eval_data = []
for eval in evals:
eval_data.append([
eval["id"],
eval["model_name"],
eval["benchmark_name"],
eval["status"],
eval["submitted_at"]
])
# Also update progress display
current_eval, progress = evaluation_queue.get_current_progress()
if current_eval:
model_info = db_manager.get_model(current_eval['model_id'])
benchmark_info = db_manager.get_benchmark(current_eval['benchmark_id'])
if model_info and benchmark_info:
eval_info = f"**Currently Evaluating:** {model_info['name']} on {benchmark_info['name']}"
progress_text = f"Progress: {progress}%"
return stats, eval_data, eval_info, progress_text
return stats, eval_data, "No evaluation currently running", "Progress: 0%"
# Update authentication status
def update_auth_message(user):
if user:
return f"Logged in as {user['username']}"
else:
return "Please log in to submit a model."
# Connect event handlers
check_size_button.click(
fn=check_model_size_handler,
inputs=[model_id_input],
outputs=[size_check_result]
)
refresh_benchmarks_button.click(
fn=refresh_benchmarks_handler,
inputs=[],
outputs=[benchmark_dropdown]
)
submit_model_button.click(
fn=submit_model_handler,
inputs=[
model_id_input,
model_name_input,
model_description_input,
model_parameters_input,
model_tag_input,
benchmark_dropdown,
user_state
],
outputs=[submission_status]
)
refresh_queue_button.click(
fn=refresh_queue_handler,
inputs=[],
outputs=[queue_stats, queue_status, current_eval_info, progress_display]
)
# Initialize on load
submission_ui.load(
fn=check_auth_on_load,
inputs=[],
outputs=[user_state]
)
submission_ui.load(
fn=lambda user: update_auth_message(user),
inputs=[user_state],
outputs=[auth_message]
)
submission_ui.load(
fn=refresh_benchmarks_handler,
inputs=[],
outputs=[benchmark_dropdown]
)
submission_ui.load(
fn=refresh_queue_handler,
inputs=[],
outputs=[queue_stats, queue_status, current_eval_info, progress_display]
)
return submission_ui