|
""" |
|
Database schema for Dynamic Highscores system. |
|
|
|
This module defines the SQLite database schema for the Dynamic Highscores system, |
|
which integrates benchmark selection, model evaluation, and leaderboard functionality. |
|
""" |
|
|
|
import sqlite3 |
|
import os |
|
import json |
|
import threading |
|
from datetime import datetime, timedelta |
|
import pandas as pd |
|
|
|
class ThreadLocalDB: |
|
"""Thread-local database connection manager.""" |
|
|
|
_thread_local = threading.local() |
|
|
|
def __init__(self, db_path): |
|
"""Initialize with database path.""" |
|
self.db_path = db_path |
|
|
|
def get_connection(self): |
|
"""Get a thread-local database connection.""" |
|
if not hasattr(self._thread_local, 'conn') or self._thread_local.conn is None: |
|
self._thread_local.conn = sqlite3.connect(self.db_path) |
|
self._thread_local.conn.row_factory = sqlite3.Row |
|
return self._thread_local.conn |
|
|
|
def get_cursor(self): |
|
"""Get a cursor from the thread-local connection.""" |
|
conn = self.get_connection() |
|
if not hasattr(self._thread_local, 'cursor') or self._thread_local.cursor is None: |
|
self._thread_local.cursor = conn.cursor() |
|
return self._thread_local.cursor |
|
|
|
def close(self): |
|
"""Close the thread-local connection if it exists.""" |
|
if hasattr(self._thread_local, 'conn') and self._thread_local.conn is not None: |
|
if hasattr(self._thread_local, 'cursor') and self._thread_local.cursor is not None: |
|
self._thread_local.cursor.close() |
|
self._thread_local.cursor = None |
|
self._thread_local.conn.close() |
|
self._thread_local.conn = None |
|
|
|
class DynamicHighscoresDB: |
|
"""Database manager for the Dynamic Highscores system.""" |
|
|
|
def __init__(self, db_path="dynamic_highscores.db"): |
|
"""Initialize the database connection and create tables if they don't exist.""" |
|
self.db_path = db_path |
|
self.thread_local_db = ThreadLocalDB(db_path) |
|
self.create_tables() |
|
|
|
def get_conn(self): |
|
"""Get the thread-local database connection.""" |
|
return self.thread_local_db.get_connection() |
|
|
|
def get_cursor(self): |
|
"""Get the thread-local database cursor.""" |
|
return self.thread_local_db.get_cursor() |
|
|
|
def close(self): |
|
"""Close the thread-local database connection.""" |
|
self.thread_local_db.close() |
|
|
|
def create_tables(self): |
|
"""Create all necessary tables if they don't exist.""" |
|
cursor = self.get_cursor() |
|
conn = self.get_conn() |
|
|
|
|
|
cursor.execute(''' |
|
CREATE TABLE IF NOT EXISTS users ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
username TEXT UNIQUE NOT NULL, |
|
hf_user_id TEXT UNIQUE NOT NULL, |
|
is_admin BOOLEAN DEFAULT 0, |
|
last_submission_date TEXT, |
|
created_at TEXT DEFAULT CURRENT_TIMESTAMP |
|
) |
|
''') |
|
|
|
|
|
cursor.execute(''' |
|
CREATE TABLE IF NOT EXISTS benchmarks ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
name TEXT NOT NULL, |
|
dataset_id TEXT NOT NULL, |
|
description TEXT, |
|
metrics TEXT, -- JSON string of metrics |
|
created_at TEXT DEFAULT CURRENT_TIMESTAMP |
|
) |
|
''') |
|
|
|
|
|
cursor.execute(''' |
|
CREATE TABLE IF NOT EXISTS models ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
name TEXT NOT NULL, |
|
hf_model_id TEXT NOT NULL, |
|
user_id INTEGER NOT NULL, |
|
tag TEXT NOT NULL, -- One of: Merge, Agent, Reasoning, Coding, etc. |
|
parameters TEXT, -- Number of parameters (can be NULL) |
|
description TEXT, |
|
created_at TEXT DEFAULT CURRENT_TIMESTAMP, |
|
FOREIGN KEY (user_id) REFERENCES users (id), |
|
UNIQUE (hf_model_id, user_id) |
|
) |
|
''') |
|
|
|
|
|
cursor.execute(''' |
|
CREATE TABLE IF NOT EXISTS evaluations ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
model_id INTEGER NOT NULL, |
|
benchmark_id INTEGER NOT NULL, |
|
status TEXT NOT NULL, -- pending, running, completed, failed |
|
results TEXT, -- JSON string of results |
|
score REAL, -- Overall score (can be NULL) |
|
submitted_at TEXT DEFAULT CURRENT_TIMESTAMP, |
|
started_at TEXT, |
|
completed_at TEXT, |
|
FOREIGN KEY (model_id) REFERENCES models (id), |
|
FOREIGN KEY (benchmark_id) REFERENCES benchmarks (id) |
|
) |
|
''') |
|
|
|
|
|
cursor.execute(''' |
|
CREATE TABLE IF NOT EXISTS queue ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
evaluation_id INTEGER NOT NULL, |
|
priority INTEGER DEFAULT 0, -- Higher number = higher priority |
|
added_at TEXT DEFAULT CURRENT_TIMESTAMP, |
|
FOREIGN KEY (evaluation_id) REFERENCES evaluations (id) |
|
) |
|
''') |
|
|
|
conn.commit() |
|
|
|
|
|
def add_user(self, username, hf_user_id, is_admin=False): |
|
"""Add a new user to the database.""" |
|
cursor = self.get_cursor() |
|
conn = self.get_conn() |
|
|
|
try: |
|
cursor.execute( |
|
"INSERT INTO users (username, hf_user_id, is_admin) VALUES (?, ?, ?)", |
|
(username, hf_user_id, is_admin) |
|
) |
|
conn.commit() |
|
return cursor.lastrowid |
|
except sqlite3.IntegrityError: |
|
|
|
cursor.execute( |
|
"SELECT id FROM users WHERE hf_user_id = ?", |
|
(hf_user_id,) |
|
) |
|
row = cursor.fetchone() |
|
return row[0] if row else None |
|
|
|
def get_user(self, hf_user_id): |
|
"""Get user information by HuggingFace user ID.""" |
|
cursor = self.get_cursor() |
|
|
|
cursor.execute( |
|
"SELECT * FROM users WHERE hf_user_id = ?", |
|
(hf_user_id,) |
|
) |
|
row = cursor.fetchone() |
|
return dict(row) if row else None |
|
|
|
def get_user_by_username(self, username): |
|
"""Get user information by username.""" |
|
cursor = self.get_cursor() |
|
|
|
cursor.execute( |
|
"SELECT * FROM users WHERE username = ?", |
|
(username,) |
|
) |
|
row = cursor.fetchone() |
|
return dict(row) if row else None |
|
|
|
def can_submit_today(self, user_id): |
|
"""Check if a user can submit a benchmark evaluation today.""" |
|
cursor = self.get_cursor() |
|
|
|
cursor.execute( |
|
"SELECT is_admin, last_submission_date FROM users WHERE id = ?", |
|
(user_id,) |
|
) |
|
result = cursor.fetchone() |
|
|
|
if not result: |
|
return False |
|
|
|
user_data = dict(result) |
|
|
|
|
|
if user_data['is_admin']: |
|
return True |
|
|
|
|
|
if not user_data['last_submission_date']: |
|
return True |
|
|
|
|
|
last_date = datetime.fromisoformat(user_data['last_submission_date']) |
|
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) |
|
|
|
return last_date < today |
|
|
|
def update_submission_date(self, user_id): |
|
"""Update the last submission date for a user.""" |
|
cursor = self.get_cursor() |
|
conn = self.get_conn() |
|
|
|
current_time = datetime.now().isoformat() |
|
cursor.execute( |
|
"UPDATE users SET last_submission_date = ? WHERE id = ?", |
|
(current_time, user_id) |
|
) |
|
conn.commit() |
|
|
|
|
|
def add_benchmark(self, name, dataset_id, description="", metrics=None): |
|
"""Add a new benchmark to the database.""" |
|
cursor = self.get_cursor() |
|
conn = self.get_conn() |
|
|
|
if metrics is None: |
|
metrics = {} |
|
|
|
metrics_json = json.dumps(metrics) |
|
|
|
try: |
|
cursor.execute( |
|
"INSERT INTO benchmarks (name, dataset_id, description, metrics) VALUES (?, ?, ?, ?)", |
|
(name, dataset_id, description, metrics_json) |
|
) |
|
conn.commit() |
|
return cursor.lastrowid |
|
except sqlite3.IntegrityError: |
|
|
|
cursor.execute( |
|
"SELECT id FROM benchmarks WHERE dataset_id = ?", |
|
(dataset_id,) |
|
) |
|
row = cursor.fetchone() |
|
return row[0] if row else None |
|
|
|
def get_benchmarks(self): |
|
"""Get all available benchmarks.""" |
|
cursor = self.get_cursor() |
|
|
|
cursor.execute("SELECT * FROM benchmarks") |
|
benchmarks = [dict(row) for row in cursor.fetchall()] |
|
|
|
|
|
for benchmark in benchmarks: |
|
if benchmark['metrics']: |
|
benchmark['metrics'] = json.loads(benchmark['metrics']) |
|
else: |
|
benchmark['metrics'] = {} |
|
|
|
return benchmarks |
|
|
|
def get_benchmark(self, benchmark_id): |
|
"""Get benchmark information by ID.""" |
|
cursor = self.get_cursor() |
|
|
|
cursor.execute( |
|
"SELECT * FROM benchmarks WHERE id = ?", |
|
(benchmark_id,) |
|
) |
|
row = cursor.fetchone() |
|
benchmark = dict(row) if row else None |
|
|
|
if benchmark and benchmark['metrics']: |
|
benchmark['metrics'] = json.loads(benchmark['metrics']) |
|
|
|
return benchmark |
|
|
|
|
|
def add_model(self, name, hf_model_id, user_id, tag, parameters=None, description=""): |
|
"""Add a new model to the database.""" |
|
cursor = self.get_cursor() |
|
conn = self.get_conn() |
|
|
|
try: |
|
cursor.execute( |
|
"INSERT INTO models (name, hf_model_id, user_id, tag, parameters, description) VALUES (?, ?, ?, ?, ?, ?)", |
|
(name, hf_model_id, user_id, tag, parameters, description) |
|
) |
|
conn.commit() |
|
return cursor.lastrowid |
|
except sqlite3.IntegrityError: |
|
|
|
cursor.execute( |
|
"SELECT id FROM models WHERE hf_model_id = ? AND user_id = ?", |
|
(hf_model_id, user_id) |
|
) |
|
row = cursor.fetchone() |
|
return row[0] if row else None |
|
|
|
def get_models(self, tag=None): |
|
"""Get all models, optionally filtered by tag.""" |
|
cursor = self.get_cursor() |
|
|
|
if tag and tag.lower() != "all": |
|
cursor.execute( |
|
"SELECT * FROM models WHERE tag = ?", |
|
(tag,) |
|
) |
|
else: |
|
cursor.execute("SELECT * FROM models") |
|
|
|
return [dict(row) for row in cursor.fetchall()] |
|
|
|
def get_model(self, model_id): |
|
"""Get model information by ID.""" |
|
cursor = self.get_cursor() |
|
|
|
cursor.execute( |
|
"SELECT * FROM models WHERE id = ?", |
|
(model_id,) |
|
) |
|
row = cursor.fetchone() |
|
return dict(row) if row else None |
|
|
|
|
|
def add_evaluation(self, model_id, benchmark_id, priority=0): |
|
"""Add a new evaluation to the database and queue.""" |
|
cursor = self.get_cursor() |
|
conn = self.get_conn() |
|
|
|
|
|
cursor.execute( |
|
"INSERT INTO evaluations (model_id, benchmark_id, status) VALUES (?, ?, 'pending')", |
|
(model_id, benchmark_id) |
|
) |
|
evaluation_id = cursor.lastrowid |
|
|
|
|
|
cursor.execute( |
|
"INSERT INTO queue (evaluation_id, priority) VALUES (?, ?)", |
|
(evaluation_id, priority) |
|
) |
|
|
|
conn.commit() |
|
return evaluation_id |
|
|
|
def update_evaluation_status(self, evaluation_id, status, results=None, score=None): |
|
"""Update the status of an evaluation.""" |
|
cursor = self.get_cursor() |
|
conn = self.get_conn() |
|
|
|
params = [status, evaluation_id] |
|
sql = "UPDATE evaluations SET status = ?" |
|
|
|
if results is not None: |
|
sql += ", results = ?" |
|
params.insert(1, json.dumps(results)) |
|
|
|
if score is not None: |
|
sql += ", score = ?" |
|
params.insert(1 if results is None else 2, score) |
|
|
|
if status in ['completed', 'failed']: |
|
sql += ", completed_at = datetime('now')" |
|
elif status == 'running': |
|
sql += ", started_at = datetime('now')" |
|
|
|
sql += " WHERE id = ?" |
|
|
|
cursor.execute(sql, params) |
|
conn.commit() |
|
|
|
def get_next_in_queue(self): |
|
"""Get the next evaluation in the queue.""" |
|
cursor = self.get_cursor() |
|
|
|
cursor.execute(""" |
|
SELECT q.*, e.id as evaluation_id, e.model_id, e.benchmark_id, e.status |
|
FROM queue q |
|
JOIN evaluations e ON q.evaluation_id = e.id |
|
WHERE e.status = 'pending' |
|
ORDER BY q.priority DESC, q.added_at ASC |
|
LIMIT 1 |
|
""") |
|
|
|
row = cursor.fetchone() |
|
return dict(row) if row else None |
|
|
|
def get_evaluation_results(self, model_id=None, benchmark_id=None, tag=None, status=None, limit=None): |
|
"""Get evaluation results, optionally filtered by model, benchmark, tag, or status.""" |
|
cursor = self.get_cursor() |
|
|
|
sql = """ |
|
SELECT e.id, e.model_id, e.benchmark_id, e.status, e.results, e.score, |
|
e.submitted_at, e.started_at, e.completed_at, m.name as model_name, m.tag, |
|
b.name as benchmark_name |
|
FROM evaluations e |
|
JOIN models m ON e.model_id = m.id |
|
JOIN benchmarks b ON e.benchmark_id = b.id |
|
WHERE 1=1 |
|
""" |
|
|
|
params = [] |
|
|
|
if status: |
|
sql += " AND e.status = ?" |
|
params.append(status) |
|
|
|
if model_id: |
|
sql += " AND e.model_id = ?" |
|
params.append(model_id) |
|
|
|
if benchmark_id and benchmark_id != "all" and benchmark_id.lower() != "all": |
|
sql += " AND e.benchmark_id = ?" |
|
params.append(benchmark_id) |
|
|
|
if tag and tag.lower() != "all": |
|
sql += " AND m.tag = ?" |
|
params.append(tag) |
|
|
|
sql += " ORDER BY e.submitted_at DESC" |
|
|
|
if limit: |
|
sql += " LIMIT ?" |
|
params.append(limit) |
|
|
|
cursor.execute(sql, params) |
|
results = [dict(row) for row in cursor.fetchall()] |
|
|
|
|
|
for result in results: |
|
if result['results']: |
|
try: |
|
result['results'] = json.loads(result['results']) |
|
except: |
|
result['results'] = {} |
|
|
|
return results |
|
|
|
def get_leaderboard_df(self, tag=None, benchmark_id=None): |
|
"""Get a pandas DataFrame of the leaderboard, optionally filtered by tag and benchmark.""" |
|
results = self.get_evaluation_results(tag=tag, benchmark_id=benchmark_id, status="completed") |
|
|
|
if not results: |
|
return pd.DataFrame() |
|
|
|
|
|
leaderboard_data = [] |
|
|
|
for result in results: |
|
entry = { |
|
'model_name': result['model_name'], |
|
'tag': result['tag'], |
|
'benchmark_name': result['benchmark_name'], |
|
'score': result['score'], |
|
'completed_at': result['completed_at'] |
|
} |
|
|
|
|
|
if result['results'] and isinstance(result['results'], dict): |
|
for key, value in result['results'].items(): |
|
if isinstance(value, (int, float)) and key not in entry: |
|
entry[key] = value |
|
|
|
leaderboard_data.append(entry) |
|
|
|
|
|
df = pd.DataFrame(leaderboard_data) |
|
|
|
|
|
if not df.empty and 'score' in df.columns: |
|
df = df.sort_values('score', ascending=False) |
|
|
|
return df |