davidpomerenke's picture
Upload from GitHub Actions: Merge pull request #18 from datenlabor-bmz/pr-17
a0d1624 verified
raw
history blame
7.04 kB
import asyncio
import pandas as pd
import time
from datetime import datetime, timedelta
from models import models
from tasks import tasks
from languages import languages
import os
async def evaluate():
# Configuration - easily adjustable defaults
n_sentences = int(
os.environ.get("N_SENTENCES", 20)
) # Default: 20 sentences per task
max_languages = int(
os.environ.get("MAX_LANGUAGES", 150)
) # Default: 150 top languages
single_model = os.environ.get(
"SINGLE_MODEL"
) # Optional: run only one specific model
test_mode = os.environ.get("TEST", "").lower() in (
"1",
"true",
"yes",
) # Optional: skip results loading/saving
# Keep original DataFrames for saving metadata - distinction added for single model test runs.
original_models_df = pd.DataFrame(models)
original_languages_df = pd.DataFrame(languages)
# Create working copies for single evaluation runs
models_df = original_models_df.copy()
languages_df = original_languages_df.copy()
top_languages = languages.head(max_languages)
# Filter to single model if specified (only affects evaluation, not saving)
if single_model:
models_df = models_df[models_df["id"] == single_model]
if len(models_df) == 0:
print(f"Error: Model '{single_model}' not found. Available models:")
for model_id in original_models_df["id"]:
print(f" {model_id}")
return pd.DataFrame()
print(
f"Starting evaluation: {len(models_df)} models, {len(top_languages)} languages, {n_sentences} sentences per task"
)
if test_mode:
print("TEST MODE: Skipping results loading/saving")
start_time = time.time()
# Load existing results to avoid re-evaluation (skip in test mode)
if test_mode:
old_results = pd.DataFrame(
columns=["model", "bcp_47", "task", "metric", "origin", "score"]
)
else:
old_results = pd.read_json("results.json")
# Get all combinations that need evaluation
combis = [
(model, lang.bcp_47, task_name)
for model in models_df["id"]
for lang in top_languages.itertuples()
for task_name, task in tasks.items()
if task_name in models_df[models_df["id"] == model]["tasks"].iloc[0]
]
# Filter out already evaluated combinations
combis = pd.DataFrame(combis, columns=["model", "bcp_47", "task"])
if not old_results.empty:
completed = set(old_results[["model", "bcp_47", "task"]].apply(tuple, axis=1))
# set + combis is faster than merge (locally it made a difference for me when loading all data/tasks into memory)
mask = ~combis.apply(
lambda row: (row["model"], row["bcp_47"], row["task"]) in completed, axis=1
)
combis = combis[mask]
# Create all evaluation tasks
all_tasks = []
for i in range(n_sentences):
for model, bcp_47, task_name in combis.itertuples(index=False):
all_tasks.append((tasks[task_name], model, bcp_47, i))
print(f"Running {len(all_tasks)} evaluation tasks...")
# For single model runs, we stop immediately on first API error to inspect.
# For full evaluations, we continue despite errors to get maximum coverage.
stop_on_error = single_model is not None
# Process tasks in batches to avoid memory issues (for full evaluation locally that helped a lot)
batch_size = 1000
all_results = []
try:
for i in range(0, len(all_tasks), batch_size):
batch = all_tasks[i : i + batch_size]
batch_results = await asyncio.gather(
*[
task_func(model, bcp_47, sentence_nr)
for task_func, model, bcp_47, sentence_nr in batch
],
return_exceptions=not stop_on_error,
)
all_results.extend(batch_results)
results = all_results
# Process results and logging API errors separately to understand what are the main issues.
valid_results = []
errors = []
for i, r in enumerate(results):
if isinstance(r, Exception):
if i < len(all_tasks):
task_info = all_tasks[i]
errors.append(f"{task_info[1]},{task_info[2]},{str(r)}")
elif isinstance(r, list):
valid_results.extend(r)
elif r is not None:
valid_results.append(r)
# log errors and store
if errors:
with open("errors.log", "w") as f:
f.write("model,task,error\n")
for error in errors:
f.write(error + "\n")
# Track model completion (TO BE DELETED - was for local run only)
if valid_results:
completed_models = set()
for result in valid_results:
if isinstance(result, dict) and "model" in result:
model = result["model"]
if model not in completed_models:
completed_models.add(model)
print(f"Completed: {model}")
print(f"Completed: {len(valid_results)} valid results, {len(errors)} errors")
# this is for local single model runs - for testing and development
except Exception as e:
print(f"EVALUATION STOPPED - API Error occurred:")
print(f"Error type: {type(e).__name__}")
print(f"Error message: {str(e)}")
return pd.DataFrame()
# Save results (skipped in test mode as we do not want to overwrite existing results)
if valid_results:
results_df = pd.DataFrame(valid_results)
# Aggregate results
results_df = (
results_df.groupby(["model", "bcp_47", "task", "metric", "origin"])
.agg({"score": "mean"})
.reset_index()
)
if not test_mode:
args = dict(orient="records", indent=2, force_ascii=False)
# Merge with existing results
if not old_results.empty:
results_df = pd.concat([old_results, results_df])
results_df = results_df.drop_duplicates(
subset=["model", "bcp_47", "task", "metric", "origin"]
)
results_df = results_df.sort_values(
by=["model", "bcp_47", "task", "metric"]
)
results_df.to_json("results.json", **args)
# Save model and language info (always save complete metadata, not filtered)
original_models_df.to_json("models.json", **args)
original_languages_df.to_json("languages.json", **args)
else:
print("TEST MODE: Skipping results saving")
elapsed = time.time() - start_time
print(f"Evaluation completed in {str(timedelta(seconds=int(elapsed)))}")
return results_df
return pd.DataFrame()
if __name__ == "__main__":
results = asyncio.run(evaluate())