davidpomerenke's picture
Upload from GitHub Actions: Merge pull request #10 from datenlabor-bmz/jn-dev
c2eeeac verified
import asyncio
import pandas as pd
import time
import os
from datetime import datetime, timedelta
from tqdm.asyncio import tqdm_asyncio
from models import models
from tasks import tasks
from languages import languages
import json
results = pd.DataFrame()
def save_checkpoint(results_df, models_df, languages_df, batch_num, total_batches):
"""Save current progress as checkpoint"""
try:
args = dict(orient="records", indent=2, force_ascii=False)
# Save current results
if len(results_df) > 0:
results_df.to_json("results.json", **args)
print(f"๐Ÿ’พ Checkpoint saved: {len(results_df)} results (batch {batch_num}/{total_batches})")
# Save model and language info
models_df.to_json("models.json", **args)
languages_df.to_json("languages.json", **args)
# Save checkpoint metadata
checkpoint_info = {
"last_batch": batch_num,
"total_batches": total_batches,
"timestamp": datetime.now().isoformat(),
"results_count": len(results_df)
}
with open("checkpoint.json", "w") as f:
json.dump(checkpoint_info, f, indent=2)
except Exception as e:
print(f"โš ๏ธ Failed to save checkpoint: {e}")
def load_checkpoint():
"""Load previous checkpoint if available"""
try:
if os.path.exists("checkpoint.json"):
with open("checkpoint.json", "r") as f:
checkpoint = json.load(f)
print(f"๐Ÿ“‚ Found checkpoint from batch {checkpoint['last_batch']}/{checkpoint['total_batches']}")
return checkpoint
except Exception as e:
print(f"โš ๏ธ Failed to load checkpoint: {e}")
return None
async def evaluate():
# FIXME we should not need this for-loop, but it helps
n_sentences = int(os.environ.get("N_SENTENCES", 15)) # Default 1 for quick testing
# Load models and languages
models_df = pd.DataFrame(models)
languages_df = pd.DataFrame(languages)
print(f"๐Ÿš€ Running full evaluation with {len(models_df)} models.")
start_time = time.time()
print(f"๐Ÿš€ Starting full evaluation at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"๐Ÿ“Š Evaluating {n_sentences} sentences per task")
# Evaluate top languages by speakers (configurable via MAX_LANGUAGES env var)
max_languages = int(os.environ.get("MAX_LANGUAGES", 2)) # Default 2 for quick testing
top_languages = languages.head(max_languages) # Top N by population
print(f"๐ŸŒ Evaluating top {len(top_languages)} languages by speakers (max: {max_languages})")
# Load checkpoint if available
checkpoint = load_checkpoint()
start_batch = 0
if checkpoint:
start_batch = checkpoint['last_batch']
print(f"๐Ÿ”„ Resuming from batch {start_batch}")
# For testing, just use all available languages up to max_languages
for n_languages in [min(max_languages, len(top_languages))]:
print(f"running evaluations for {n_languages} languages")
# Load existing results
try:
old_results = pd.read_json("results.json")
if old_results.empty:
old_results = pd.DataFrame(columns=["model", "bcp_47", "task", "metric", "origin", "score"])
except FileNotFoundError:
old_results = pd.DataFrame(columns=["model", "bcp_47", "task", "metric", "origin", "score"])
try:
old_models = pd.read_json("models.json")
except FileNotFoundError:
old_models = pd.DataFrame()
# get all combinations of model, language and task
combis = [
(model, lang.bcp_47, task_name)
for model in models_df["id"]
for lang in top_languages.iloc[:n_languages].itertuples()
for task_name, task in tasks.items()
if task_name in models_df[models_df["id"] == model]["tasks"].iloc[0]
]
# filter out combinations that have already been evaluated
combis = pd.DataFrame(combis, columns=["model", "bcp_47", "task"])
combis = combis.merge(old_results, on=["model", "bcp_47", "task"], how="left")
combis = combis[combis["metric"].isna()][["model", "bcp_47", "task"]]
# run evaluations in batches to prevent HTTP pool exhaustion
all_tasks = []
for i in range(n_sentences):
for model, bcp_47, task_name in combis.itertuples(index=False):
# All tasks now use the same signature
all_tasks.append((tasks[task_name], model, bcp_47, i))
print(f"โณ Processing {len(all_tasks)} evaluation tasks in batches...")
batch_size = 200 # Process 200 tasks at a time (optimized for GitHub Actions)
all_results = []
# Calculate total batches for progress tracking
total_batches = (len(all_tasks) + batch_size - 1) // batch_size
for i in range(start_batch * batch_size, len(all_tasks), batch_size):
batch = all_tasks[i:i+batch_size]
current_batch = i // batch_size + 1
print(f"๐Ÿ“ฆ Processing batch {current_batch}/{total_batches} ({len(batch)} tasks)")
# Show what's being evaluated in this batch
batch_summary = {}
for task_data in batch:
task_func, model, bcp_47, sentence_nr = task_data
# Extract task name from function - handle both partial functions and regular functions
if hasattr(task_func, 'func'):
task_name = task_func.func.__name__.replace('_and_evaluate', '')
else:
task_name = task_func.__name__.replace('_and_evaluate', '')
if task_name not in batch_summary:
batch_summary[task_name] = set()
batch_summary[task_name].add(bcp_47)
for task_name, languages_set in batch_summary.items():
lang_list = ', '.join(sorted(languages_set))
print(f" ๐Ÿ”„ {task_name}: {lang_list}")
batch_coroutines = []
for task_data in batch:
task_func, model, bcp_47, sentence_nr = task_data
batch_coroutines.append(task_func(model, bcp_47, sentence_nr))
try:
batch_results = await asyncio.gather(*batch_coroutines, return_exceptions=True)
all_results.extend(batch_results)
# Save checkpoint after each batch
valid_results = []
exception_count = 0
for r in batch_results:
if isinstance(r, Exception):
exception_count += 1
continue
if isinstance(r, list):
valid_results.extend(r)
else:
valid_results.append(r)
if valid_results:
# Aggregate results
batch_df = pd.DataFrame(valid_results)
if len(batch_df) > 0:
batch_df = (
batch_df.groupby(["model", "bcp_47", "task", "metric", "origin"])
.agg({"score": "mean"})
.reset_index()
)
# Merge with existing results
all_results_df = pd.concat([old_results, batch_df])
all_results_df = all_results_df.drop_duplicates(subset=["model", "bcp_47", "task", "metric", "origin"])
all_results_df = all_results_df.sort_values(by=["model", "bcp_47", "task", "metric"])
# Save checkpoint
save_checkpoint(all_results_df, models_df, languages_df, current_batch, total_batches)
# Update old_results for next batch
old_results = all_results_df
print(f"โœ… Batch {current_batch} completed: {len(valid_results)} valid results, {exception_count} errors")
except Exception as e:
print(f"โŒ Batch {current_batch} failed: {e}")
# Save checkpoint even on failure
if len(all_results) > 0:
results_df = pd.DataFrame(all_results)
save_checkpoint(results_df, models_df, languages_df, current_batch, total_batches)
continue
# Reduced delay between batches (optimized for GitHub Actions)
await asyncio.sleep(0.5)
# Final aggregation and save
results = all_results
# Filter out exceptions and flatten results
valid_results = []
exception_count = 0
for r in results:
if isinstance(r, Exception):
exception_count += 1
continue
if isinstance(r, list):
valid_results.extend(r)
else:
valid_results.append(r)
print(f"โš ๏ธ Encountered {exception_count} API errors (model unavailable/rate limits)")
print(f"โœ… Successfully processed {len(valid_results)} evaluations")
# Save final results
if valid_results:
results = valid_results
args = dict(orient="records", indent=2, force_ascii=False)
# Aggregate results like main branch
results_df = pd.DataFrame(results)
if len(results_df) > 0:
results_df = (
results_df.groupby(["model", "bcp_47", "task", "metric", "origin"])
.agg({"score": "mean"})
.reset_index()
)
# Merge with old results
old_results = pd.read_json("results.json")
results_df = pd.concat([old_results, results_df])
results_df = results_df.drop_duplicates(subset=["model", "bcp_47", "task", "metric", "origin"])
results_df = results_df.sort_values(by=["model", "bcp_47", "task", "metric"])
results_df.to_json("results.json", **args)
print(f"๐Ÿ’พ Saved {len(results_df)} aggregated results to results.json")
else:
print("โš ๏ธ No valid results to aggregate")
else:
print("โš ๏ธ No valid results to save - all API calls failed")
# Save up-to-date info on models and languages (like main branch)
all_models = pd.concat([pd.DataFrame(models), old_models])
all_models = all_models.drop_duplicates(subset=["id"]).sort_values(by=["id"])
all_models.to_json("models.json", **args)
pd.DataFrame(languages).to_json("languages.json", **args)
# Continue with next batch even if this one had errors
# Time estimation
elapsed = time.time() - start_time
elapsed_str = str(timedelta(seconds=int(elapsed)))
if n_languages < max_languages:
remaining_batches = (max_languages - n_languages) // 10
batch_count = max(1, n_languages // 10) # Avoid division by zero
estimated_remaining = elapsed * remaining_batches / batch_count
eta = datetime.now() + timedelta(seconds=estimated_remaining)
print(f"โฑ๏ธ Batch completed in {elapsed_str}. ETA for full run: {eta.strftime('%H:%M:%S')}")
else:
print(f"โœ… Full evaluation completed in {elapsed_str}")
print(f"๐ŸŽ‰ Finished at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# Clean up checkpoint file on successful completion
if os.path.exists("checkpoint.json"):
os.remove("checkpoint.json")
print("๐Ÿงน Cleaned up checkpoint file")
return results
if __name__ == "__main__":
results = asyncio.run(evaluate())