qLeaderboard-aBase4Community / evaluation_queue.py
Quazim0t0's picture
Update evaluation_queue.py
ecb6742 verified
raw
history blame
32.2 kB
"""
Model evaluation queue system for Dynamic Highscores.
This module handles the evaluation queue, CPU-only processing,
and enforces daily submission limits for users.
"""
import os
import json
import time
import threading
import queue as queue_module
from datetime import datetime, timedelta
import gradio as gr
from huggingface_hub import HfApi, hf_hub_download, snapshot_download
from datasets import load_dataset
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
import sqlite3
class EvaluationQueue:
"""Manages the evaluation queue for model benchmarking."""
def __init__(self, db_manager, auth_manager):
"""Initialize the evaluation queue manager.
Args:
db_manager: Database manager instance
auth_manager: Authentication manager instance
"""
self.db_manager = db_manager
self.auth_manager = auth_manager
self.hf_api = HfApi()
self.queue = queue_module.Queue()
self.is_processing = False
self.worker_thread = None
self.model_tags = ["Merge", "Agent", "Reasoning", "Coding", "General", "Specialized", "Instruction", "Chat"]
self.current_evaluation = None
self.progress = 0
self.progress_lock = threading.Lock()
def start_worker(self):
"""Start the worker thread for processing the evaluation queue."""
if self.worker_thread is None or not self.worker_thread.is_alive():
self.is_processing = True
self.worker_thread = threading.Thread(target=self._process_queue)
self.worker_thread.daemon = True
self.worker_thread.start()
def stop_worker(self):
"""Stop the worker thread."""
self.is_processing = False
if self.worker_thread and self.worker_thread.is_alive():
self.worker_thread.join(timeout=1.0)
def _process_queue(self):
"""Process the evaluation queue in a separate thread."""
while self.is_processing:
try:
# Get the next evaluation from the database
pending_evals = self.db_manager.get_evaluation_results(status="pending")
if pending_evals:
# Sort by priority and added_at
next_eval = pending_evals[0]
# Update status to running
self.db_manager.update_evaluation_status(next_eval['id'], 'running')
# Set current evaluation and reset progress
with self.progress_lock:
self.current_evaluation = next_eval
self.progress = 0
try:
# Get model and benchmark details
model_info = self.db_manager.get_model(next_eval['model_id'])
benchmark_info = self.db_manager.get_benchmark(next_eval['benchmark_id'])
if model_info and benchmark_info:
# Run the evaluation
results = self._run_evaluation(
model_info['hf_model_id'],
benchmark_info['dataset_id']
)
# Calculate overall score
score = self._calculate_overall_score(results)
# Update status to completed with results
self.db_manager.update_evaluation_status(
next_eval['id'],
'completed',
results=results,
score=score
)
else:
raise Exception("Model or benchmark not found")
except Exception as e:
print(f"Evaluation error: {e}")
# Update status to failed
self.db_manager.update_evaluation_status(next_eval['id'], 'failed')
# Clear current evaluation
with self.progress_lock:
self.current_evaluation = None
self.progress = 0
else:
# No evaluations in queue, sleep for a bit
time.sleep(5)
except Exception as e:
print(f"Queue processing error: {e}")
time.sleep(5)
def _run_evaluation(self, model_id, dataset_id):
"""Run an evaluation for a model on a benchmark.
Args:
model_id: HuggingFace model ID
dataset_id: HuggingFace dataset ID (with optional config)
Returns:
dict: Evaluation results
"""
# Update progress
with self.progress_lock:
self.progress = 5 # Starting evaluation
# Parse dataset ID and config
if ":" in dataset_id:
dataset_id, config = dataset_id.split(":", 1)
else:
config = None
# Update progress
with self.progress_lock:
self.progress = 10 # Loading dataset
# Load the dataset
if config:
dataset = load_dataset(dataset_id, config, split="test")
else:
dataset = load_dataset(dataset_id, split="test")
# Update progress
with self.progress_lock:
self.progress = 20 # Loading model
# Load the model (CPU only)
device = "cpu"
model = AutoModelForCausalLM.from_pretrained(
model_id,
device_map=device,
torch_dtype=torch.float32, # Use float32 for CPU
low_cpu_mem_usage=True
)
tokenizer = AutoTokenizer.from_pretrained(model_id)
# Update progress
with self.progress_lock:
self.progress = 30 # Determining task type
# Determine task type based on dataset features
task_type = self._determine_task_type(dataset)
# Update progress
with self.progress_lock:
self.progress = 40 # Starting evaluation
# Run appropriate evaluation based on task type
if task_type == "text-generation":
results = self._evaluate_text_generation(model, tokenizer, dataset)
elif task_type == "question-answering":
results = self._evaluate_question_answering(model, tokenizer, dataset)
elif task_type == "classification":
results = self._evaluate_classification(model, tokenizer, dataset)
elif task_type == "code-generation":
results = self._evaluate_code_generation(model, tokenizer, dataset)
else:
# Default to general evaluation
results = self._evaluate_general(model, tokenizer, dataset)
# Update progress
with self.progress_lock:
self.progress = 95 # Cleaning up
# Clean up to free memory
del model
del tokenizer
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Update progress
with self.progress_lock:
self.progress = 100 # Completed
return results
def get_current_progress(self):
"""Get the current evaluation progress.
Returns:
tuple: (current_evaluation, progress_percentage)
"""
with self.progress_lock:
return self.current_evaluation, self.progress
def _determine_task_type(self, dataset):
"""Determine the task type based on dataset features.
Args:
dataset: HuggingFace dataset
Returns:
str: Task type
"""
features = dataset.features
# Check for common feature patterns
if "question" in features and "answer" in features:
return "question-answering"
elif "code" in features or "solution" in features:
return "code-generation"
elif "label" in features or "class" in features:
return "classification"
elif "input" in features and "output" in features:
return "text-generation"
else:
return "general"
def _evaluate_text_generation(self, model, tokenizer, dataset):
"""Evaluate a model on text generation tasks.
Args:
model: HuggingFace model
tokenizer: HuggingFace tokenizer
dataset: HuggingFace dataset
Returns:
dict: Evaluation results
"""
# Set up generation pipeline
generator = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
device="cpu"
)
# Sample a subset for evaluation (to keep runtime reasonable)
if len(dataset) > 100:
dataset = dataset.select(range(100))
# Track metrics
correct = 0
total = 0
generated_texts = []
# Process each example
for i, example in enumerate(dataset):
# Update progress based on completion percentage
with self.progress_lock:
self.progress = 40 + int((i / len(dataset)) * 50)
input_text = example.get("input", example.get("prompt", ""))
expected_output = example.get("output", example.get("target", ""))
if not input_text or not expected_output:
continue
# Generate text
generated = generator(
input_text,
max_length=100,
num_return_sequences=1
)
generated_text = generated[0]["generated_text"]
generated_texts.append(generated_text)
# Simple exact match check
if expected_output.strip() in generated_text:
correct += 1
total += 1
# Calculate metrics
accuracy = correct / total if total > 0 else 0
return {
"accuracy": accuracy,
"samples_evaluated": total,
"generated_samples": generated_texts[:5] # Include a few samples
}
def _evaluate_question_answering(self, model, tokenizer, dataset):
"""Evaluate a model on question answering tasks.
Args:
model: HuggingFace model
tokenizer: HuggingFace tokenizer
dataset: HuggingFace dataset
Returns:
dict: Evaluation results
"""
# Set up QA pipeline
qa_pipeline = pipeline(
"question-answering",
model=model,
tokenizer=tokenizer,
device="cpu"
)
# Sample a subset for evaluation
if len(dataset) > 100:
dataset = dataset.select(range(100))
# Track metrics
exact_matches = 0
f1_scores = []
total = 0
# Process each example
for i, example in enumerate(dataset):
# Update progress based on completion percentage
with self.progress_lock:
self.progress = 40 + int((i / len(dataset)) * 50)
question = example.get("question", "")
context = example.get("context", "")
answer = example.get("answer", "")
if not question or not answer:
continue
# Get model prediction
if context:
result = qa_pipeline(question=question, context=context)
else:
# If no context provided, use the question as context
result = qa_pipeline(question=question, context=question)
predicted_answer = result["answer"]
# Calculate exact match
if predicted_answer.strip() == answer.strip():
exact_matches += 1
# Calculate F1 score
f1 = self._calculate_f1(answer, predicted_answer)
f1_scores.append(f1)
total += 1
# Calculate metrics
exact_match_accuracy = exact_matches / total if total > 0 else 0
avg_f1 = sum(f1_scores) / len(f1_scores) if f1_scores else 0
return {
"exact_match": exact_match_accuracy,
"f1": avg_f1,
"samples_evaluated": total
}
def _evaluate_classification(self, model, tokenizer, dataset):
"""Evaluate a model on classification tasks.
Args:
model: HuggingFace model
tokenizer: HuggingFace tokenizer
dataset: HuggingFace dataset
Returns:
dict: Evaluation results
"""
# Set up classification pipeline
classifier = pipeline(
"text-classification",
model=model,
tokenizer=tokenizer,
device="cpu"
)
# Sample a subset for evaluation
if len(dataset) > 100:
dataset = dataset.select(range(100))
# Track metrics
correct = 0
total = 0
# Process each example
for i, example in enumerate(dataset):
# Update progress based on completion percentage
with self.progress_lock:
self.progress = 40 + int((i / len(dataset)) * 50)
text = example.get("text", example.get("sentence", ""))
label = str(example.get("label", example.get("class", "")))
if not text or not label:
continue
# Get model prediction
result = classifier(text)
predicted_label = result[0]["label"]
# Check if correct
if str(predicted_label) == label:
correct += 1
total += 1
# Calculate metrics
accuracy = correct / total if total > 0 else 0
return {
"accuracy": accuracy,
"samples_evaluated": total
}
def _evaluate_code_generation(self, model, tokenizer, dataset):
"""Evaluate a model on code generation tasks.
Args:
model: HuggingFace model
tokenizer: HuggingFace tokenizer
dataset: HuggingFace dataset
Returns:
dict: Evaluation results
"""
# Set up generation pipeline
generator = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
device="cpu"
)
# Sample a subset for evaluation
if len(dataset) > 50: # Smaller sample for code tasks
dataset = dataset.select(range(50))
# Track metrics
exact_matches = 0
functional_matches = 0
total = 0
# Process each example
for i, example in enumerate(dataset):
# Update progress based on completion percentage
with self.progress_lock:
self.progress = 40 + int((i / len(dataset)) * 50)
prompt = example.get("prompt", example.get("input", ""))
solution = example.get("solution", example.get("output", ""))
if not prompt or not solution:
continue
# Generate code
generated = generator(
prompt,
max_length=200,
num_return_sequences=1
)
generated_code = generated[0]["generated_text"]
# Extract code from generated text (remove prompt)
if prompt in generated_code:
generated_code = generated_code[len(prompt):].strip()
# Check exact match
if generated_code.strip() == solution.strip():
exact_matches += 1
functional_matches += 1
else:
# We would ideally check functional correctness here
# but that requires executing code which is complex and potentially unsafe
# For now, we'll use a simple heuristic
if len(generated_code) > 0 and any(keyword in generated_code for keyword in ["def ", "function", "return", "class"]):
functional_matches += 0.5 # Partial credit
total += 1
# Calculate metrics
exact_match_rate = exact_matches / total if total > 0 else 0
functional_correctness = functional_matches / total if total > 0 else 0
return {
"exact_match": exact_match_rate,
"functional_correctness": functional_correctness,
"samples_evaluated": total
}
def _evaluate_general(self, model, tokenizer, dataset):
"""General evaluation for any dataset type.
Args:
model: HuggingFace model
tokenizer: HuggingFace tokenizer
dataset: HuggingFace dataset
Returns:
dict: Evaluation results
"""
# Set up generation pipeline
generator = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
device="cpu"
)
# Sample a subset for evaluation
if len(dataset) > 50:
dataset = dataset.select(range(50))
# Find input and output fields
features = dataset.features
input_field = None
output_field = None
for field in features:
if field.lower() in ["input", "prompt", "question", "text"]:
input_field = field
elif field.lower() in ["output", "target", "answer", "response"]:
output_field = field
if not input_field:
# Just use the first string field as input
for field in features:
if isinstance(features[field], (str, list)):
input_field = field
break
# Track metrics
total = 0
generated_texts = []
# Process each example
for i, example in enumerate(dataset):
# Update progress based on completion percentage
with self.progress_lock:
self.progress = 40 + int((i / len(dataset)) * 50)
if input_field and input_field in example:
input_text = str(example[input_field])
# Generate text
generated = generator(
input_text,
max_length=100,
num_return_sequences=1
)
generated_text = generated[0]["generated_text"]
generated_texts.append({
"input": input_text,
"output": generated_text,
"expected": str(example[output_field]) if output_field and output_field in example else "N/A"
})
total += 1
return {
"samples_evaluated": total,
"generated_samples": generated_texts[:5] # Include a few samples
}
def _calculate_f1(self, answer, prediction):
"""Calculate F1 score between answer and prediction.
Args:
answer: Ground truth answer
prediction: Model prediction
Returns:
float: F1 score
"""
# Tokenize
answer_tokens = answer.lower().split()
prediction_tokens = prediction.lower().split()
# Calculate precision and recall
common_tokens = set(answer_tokens) & set(prediction_tokens)
if not common_tokens:
return 0.0
precision = len(common_tokens) / len(prediction_tokens)
recall = len(common_tokens) / len(answer_tokens)
# Calculate F1
if precision + recall == 0:
return 0.0
f1 = 2 * precision * recall / (precision + recall)
return f1
def _calculate_overall_score(self, results):
"""Calculate an overall score from evaluation results.
Args:
results: Evaluation results dictionary
Returns:
float: Overall score between 0 and 100
"""
score = 0.0
# Check for common metrics and weight them
if "accuracy" in results:
score += results["accuracy"] * 100
if "exact_match" in results:
score += results["exact_match"] * 100
if "f1" in results:
score += results["f1"] * 100
if "functional_correctness" in results:
score += results["functional_correctness"] * 100
# If multiple metrics were found, average them
num_metrics = sum(1 for metric in ["accuracy", "exact_match", "f1", "functional_correctness"] if metric in results)
if num_metrics > 0:
score /= num_metrics
else:
# Default score if no metrics available
score = 50.0
return score
def submit_evaluation(self, model_id, benchmark_id, user_id, priority=0):
"""Submit a model for evaluation on a benchmark.
Args:
model_id: Model ID in the database
benchmark_id: Benchmark ID in the database
user_id: User ID submitting the evaluation
priority: Queue priority (higher = higher priority)
Returns:
tuple: (evaluation_id, message)
"""
# Check if user can submit today
if not self.auth_manager.can_submit_benchmark(user_id):
return None, "Daily submission limit reached. Try again tomorrow."
try:
# Add evaluation to database and queue
evaluation_id = self.db_manager.add_evaluation(
model_id=model_id,
benchmark_id=benchmark_id,
priority=priority
)
# Update user's last submission date
self.auth_manager.update_submission_date(user_id)
# Make sure worker is running
self.start_worker()
return evaluation_id, "Evaluation submitted successfully."
except Exception as e:
print(f"Submit evaluation error: {e}")
return None, f"Failed to submit evaluation: {str(e)}"
def get_queue_status(self):
"""Get the current status of the evaluation queue.
Returns:
dict: Queue status information
"""
try:
# Get evaluations from database
pending_evals = self.db_manager.get_evaluation_results(status="pending")
running_evals = self.db_manager.get_evaluation_results(status="running")
completed_evals = self.db_manager.get_evaluation_results(status="completed")
failed_evals = self.db_manager.get_evaluation_results(status="failed")
# Get current evaluation progress
current_eval, progress = self.get_current_progress()
return {
"pending": len(pending_evals),
"running": len(running_evals),
"completed": len(completed_evals),
"failed": len(failed_evals),
"is_processing": self.is_processing,
"current_evaluation": current_eval,
"progress": progress
}
except Exception as e:
print(f"Queue status error: {e}")
return {
"pending": 0,
"running": 0,
"completed": 0,
"failed": 0,
"is_processing": self.is_processing,
"current_evaluation": None,
"progress": 0,
"error": str(e)
}
# Model submission UI components
def create_model_submission_ui(evaluation_queue, auth_manager, db_manager):
"""Create the model submission UI components.
Args:
evaluation_queue: Evaluation queue instance
auth_manager: Authentication manager instance
db_manager: Database manager instance
Returns:
gr.Blocks: Gradio Blocks component with model submission UI
"""
with gr.Blocks() as submission_ui:
with gr.Tab("Submit Model"):
with gr.Row():
with gr.Column(scale=2):
model_id_input = gr.Textbox(
placeholder="HuggingFace model ID (e.g., 'gpt2', 'facebook/opt-350m')",
label="Model ID"
)
model_name_input = gr.Textbox(
placeholder="Display name for your model",
label="Model Name"
)
model_description_input = gr.Textbox(
placeholder="Brief description of your model",
label="Description",
lines=3
)
model_parameters_input = gr.Number(
label="Number of Parameters (billions)",
precision=2
)
with gr.Column(scale=1):
model_tag_input = gr.Dropdown(
choices=evaluation_queue.model_tags,
label="Model Tag",
info="Select one category that best describes your model"
)
benchmark_dropdown = gr.Dropdown(
label="Benchmark",
info="Select a benchmark to evaluate your model on"
)
refresh_benchmarks_button = gr.Button("Refresh Benchmarks")
submit_model_button = gr.Button("Submit for Evaluation")
submission_status = gr.Markdown("")
with gr.Tab("Evaluation Queue"):
refresh_queue_button = gr.Button("Refresh Queue")
with gr.Row():
with gr.Column(scale=1):
queue_stats = gr.JSON(
label="Queue Statistics"
)
with gr.Column(scale=2):
queue_status = gr.Dataframe(
headers=["ID", "Model", "Benchmark", "Status", "Submitted"],
label="Recent Evaluations"
)
with gr.Row(visible=True) as progress_container:
with gr.Column():
current_eval_info = gr.Markdown("No evaluation currently running")
# Use a simple text display for progress instead of Progress component
progress_display = gr.Markdown("Progress: 0%")
# Event handlers
def refresh_benchmarks_handler():
benchmarks = db_manager.get_benchmarks()
# Format for dropdown
choices = [(str(b["id"]), b["name"]) for b in benchmarks]
return gr.update(choices=choices)
def submit_model_handler(model_id, model_name, model_description, model_parameters, model_tag, benchmark_id, request: gr.Request):
# Check if user is logged in
user = auth_manager.check_login(request)
if not user:
return "Please log in to submit a model."
if not model_id or not model_name or not model_tag or not benchmark_id:
return "Please fill in all required fields."
try:
# Add model to database
model_db_id = db_manager.add_model(
name=model_name,
hf_model_id=model_id,
user_id=user["id"],
tag=model_tag,
parameters=str(model_parameters) if model_parameters else None,
description=model_description
)
if not model_db_id:
return "Failed to add model to database."
# Submit for evaluation
eval_id, message = evaluation_queue.submit_evaluation(
model_id=model_db_id,
benchmark_id=benchmark_id,
user_id=user["id"]
)
if eval_id:
return f"Model submitted successfully. Evaluation ID: {eval_id}"
else:
return message
except Exception as e:
return f"Error submitting model: {str(e)}"
def refresh_queue_handler():
# Get queue statistics
stats = evaluation_queue.get_queue_status()
# Get recent evaluations (all statuses, limited to 20)
evals = db_manager.get_evaluation_results(limit=20)
# Format for dataframe
eval_data = []
for eval in evals:
eval_data.append([
eval["id"],
eval["model_name"],
eval["benchmark_name"],
eval["status"],
eval["submitted_at"]
])
# Also update progress display
current_eval, progress = evaluation_queue.get_current_progress()
if current_eval:
model_info = db_manager.get_model(current_eval['model_id'])
benchmark_info = db_manager.get_benchmark(current_eval['benchmark_id'])
if model_info and benchmark_info:
eval_info = f"**Currently Evaluating:** {model_info['name']} on {benchmark_info['name']}"
progress_text = f"Progress: {progress}%"
return stats, eval_data, eval_info, progress_text
return stats, eval_data, "No evaluation currently running", "Progress: 0%"
# Connect event handlers
refresh_benchmarks_button.click(
fn=refresh_benchmarks_handler,
inputs=[],
outputs=[benchmark_dropdown]
)
submit_model_button.click(
fn=submit_model_handler,
inputs=[
model_id_input,
model_name_input,
model_description_input,
model_parameters_input,
model_tag_input,
benchmark_dropdown
],
outputs=[submission_status]
)
refresh_queue_button.click(
fn=refresh_queue_handler,
inputs=[],
outputs=[queue_stats, queue_status, current_eval_info, progress_display]
)
# Initialize on load
submission_ui.load(
fn=refresh_benchmarks_handler,
inputs=[],
outputs=[benchmark_dropdown]
)
return submission_ui